add exclude_keys option to KeyValueProcessor (#24876)

and modify data-structure of `include_keys` and `exclude_keys` to be
backed by a HashSet
This commit is contained in:
Tal Levy 2017-06-05 14:12:48 -07:00 committed by GitHub
parent e98d5676b3
commit e51246023a
4 changed files with 65 additions and 19 deletions

View File

@ -1621,6 +1621,7 @@ For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`
| `value_split` | yes | - | Regex pattern to use for splitting the key from the value within a key-value pair
| `target_field` | no | `null` | The field to insert the extracted keys into. Defaults to the root of the document
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
| `exclude_keys` | no | `null` | List of keys to exclude from document
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
|======

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
@ -26,8 +27,10 @@ import org.elasticsearch.ingest.Processor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
@ -39,18 +42,20 @@ public final class KeyValueProcessor extends AbstractProcessor {
private final String field;
private final String fieldSplit;
private final String valueSplit;
private final List<String> includeKeys;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final boolean ignoreMissing;
KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, List<String> includeKeys,
String targetField, boolean ignoreMissing) {
KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
Set<String> excludeKeys, String targetField, boolean ignoreMissing) {
super(tag);
this.field = field;
this.targetField = targetField;
this.fieldSplit = fieldSplit;
this.valueSplit = valueSplit;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
this.ignoreMissing = ignoreMissing;
}
@ -66,10 +71,14 @@ public final class KeyValueProcessor extends AbstractProcessor {
return valueSplit;
}
List<String> getIncludeKeys() {
Set<String> getIncludeKeys() {
return includeKeys;
}
Set<String> getExcludeKeys() {
return excludeKeys;
}
String getTargetField() {
return targetField;
}
@ -105,7 +114,9 @@ public final class KeyValueProcessor extends AbstractProcessor {
}
return kv;
})
.filter((p) -> includeKeys == null || includeKeys.contains(p[0]))
.filter((p) ->
(includeKeys == null || includeKeys.contains(p[0])) &&
(excludeKeys == null || excludeKeys.contains(p[0]) == false))
.forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
}
@ -122,12 +133,18 @@ public final class KeyValueProcessor extends AbstractProcessor {
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
List<String> includeKeys = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
if (includeKeys != null) {
includeKeys = Collections.unmodifiableList(includeKeys);
Set<String> includeKeys = null;
Set<String> excludeKeys = null;
List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
if (includeKeysList != null) {
includeKeys = Collections.unmodifiableSet(Sets.newHashSet(includeKeysList));
}
List<String> excludeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "exclude_keys");
if (excludeKeysList != null) {
excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, targetField, ignoreMissing);
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing);
}
}
}

View File

@ -21,9 +21,11 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -58,6 +60,7 @@ public class KeyValueProcessorFactoryTests extends ESTestCase {
config.put("value_split", "=");
config.put("target_field", "target");
config.put("include_keys", Arrays.asList("a", "b"));
config.put("exclude_keys", Collections.emptyList());
config.put("ignore_missing", true);
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, config);
@ -65,7 +68,8 @@ public class KeyValueProcessorFactoryTests extends ESTestCase {
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), equalTo(Arrays.asList("a", "b")));
assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b")));
assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet()));
assertThat(processor.getTargetField(), equalTo("target"));
assertTrue(processor.isIgnoreMissing());
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
@ -36,7 +37,7 @@ public class KeyValueProcessorTests extends ESTestCase {
public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
@ -45,7 +46,7 @@ public class KeyValueProcessorTests extends ESTestCase {
public void testRootTarget() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "myField", "&", "=", null, null, false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "myField", "&", "=", null, null,null, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello"));
assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe")));
@ -54,7 +55,7 @@ public class KeyValueProcessorTests extends ESTestCase {
public void testKeySameAsSourceField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
ingestDocument.setFieldValue("first", "first=hello");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "first", "&", "=", null, null, false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "first", "&", "=", null, null,null, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello")));
}
@ -63,15 +64,38 @@ public class KeyValueProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
Collections.singletonList("first"), "target", false);
Sets.newHashSet("first"), null, "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertFalse(ingestDocument.hasField("target.second"));
}
public void testExcludeKeys() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
null, Sets.newHashSet("second"), "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertFalse(ingestDocument.hasField("target.second"));
}
public void testIncludeAndExcludeKeys() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument,
"first=hello&second=world&second=universe&third=bar");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
Sets.newHashSet("first", "second"), Sets.newHashSet("first", "second"), "target", false);
processor.execute(ingestDocument);
assertFalse(ingestDocument.hasField("target.first"));
assertFalse(ingestDocument.hasField("target.second"));
assertFalse(ingestDocument.hasField("target.third"));
}
public void testMissingField() {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "&",
"=", null, null, "target", false);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
}
@ -81,7 +105,7 @@ public class KeyValueProcessorTests extends ESTestCase {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap(fieldName, null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "", "", null, "target", true);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "", "", null, null, "target", true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}
@ -89,7 +113,7 @@ public class KeyValueProcessorTests extends ESTestCase {
public void testNonExistentWithIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "", "", null, "target", true);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "", "", null, null, "target", true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}
@ -97,7 +121,7 @@ public class KeyValueProcessorTests extends ESTestCase {
public void testFailFieldSplitMatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello|second=world|second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello|second=world|second=universe"));
assertFalse(ingestDocument.hasField("target.second"));
@ -105,7 +129,7 @@ public class KeyValueProcessorTests extends ESTestCase {
public void testFailValueSplitMatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("foo", "bar"));
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "foo", "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "foo", "&", "=", null, null, "target", false);
Exception exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [foo] does not contain value_split [=]"));
}