Merge pull request #15968 from talevy/ingest-dedot-processor

[Ingest] Introduce DeDot Processor
This commit is contained in:
Tal Levy 2016-01-15 11:37:10 -08:00
commit 7c4f874ec8
7 changed files with 279 additions and 1 deletions

View File

@ -463,7 +463,7 @@ public final class IngestDocument {
/**
* Returns the document including its metadata fields, unless {@link #extractMetadata()} has been called, in which case the
* metadata fields will not be present anymore. Should be used only for reading.
* metadata fields will not be present anymore.
* Modify the document instead using {@link #setFieldValue(String, Object)} and {@link #removeField(String)}
*/
public Map<String, Object> getSourceAndMetadata() {

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Processor that replaces dots in document field names with a
* specified separator.
*/
public class DeDotProcessor implements Processor {
public static final String TYPE = "dedot";
static final String DEFAULT_SEPARATOR = "_";
private final String separator;
public DeDotProcessor(String separator) {
this.separator = separator;
}
public String getSeparator() {
return separator;
}
@Override
public void execute(IngestDocument document) {
deDot(document.getSourceAndMetadata());
}
@Override
public String getType() {
return TYPE;
}
/**
* Recursively iterates through Maps and Lists in search of map entries with
* keys containing dots. The dots in these fields are replaced with {@link #separator}.
*
* @param obj The current object in context to be checked for dots in its fields.
*/
private void deDot(Object obj) {
if (obj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> doc = (Map) obj;
Iterator<Map.Entry<String, Object>> it = doc.entrySet().iterator();
Map<String, Object> deDottedFields = new HashMap<>();
while (it.hasNext()) {
Map.Entry<String, Object> entry = it.next();
deDot(entry.getValue());
String fieldName = entry.getKey();
if (fieldName.contains(".")) {
String deDottedFieldName = fieldName.replaceAll("\\.", separator);
deDottedFields.put(deDottedFieldName, entry.getValue());
it.remove();
}
}
doc.putAll(deDottedFields);
} else if (obj instanceof List) {
@SuppressWarnings("unchecked")
List<Object> list = (List) obj;
list.forEach(this::deDot);
}
}
public static class Factory implements Processor.Factory<DeDotProcessor> {
@Override
public DeDotProcessor create(Map<String, Object> config) throws Exception {
String separator = ConfigurationUtils.readOptionalStringProperty(config, "separator");
if (separator == null) {
separator = DEFAULT_SEPARATOR;
}
return new DeDotProcessor(separator);
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.DeDotProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
@ -77,6 +78,7 @@ public class NodeModule extends AbstractModule {
registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory());
registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new);
registerProcessor(DeDotProcessor.TYPE, (templateService) -> new DeDotProcessor.Factory());
}
@Override

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class DeDotProcessorFactoryTests extends ESTestCase {
private DeDotProcessor.Factory factory;
@Before
public void init() {
factory = new DeDotProcessor.Factory();
}
public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("separator", "_");
DeDotProcessor deDotProcessor = factory.create(config);
assertThat(deDotProcessor.getSeparator(), equalTo("_"));
}
public void testCreateMissingSeparatorField() throws Exception {
Map<String, Object> config = new HashMap<>();
DeDotProcessor deDotProcessor = factory.create(config);
assertThat(deDotProcessor.getSeparator(), equalTo(DeDotProcessor.DEFAULT_SEPARATOR));
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class DeDotProcessorTests extends ESTestCase {
public void testSimple() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put("a.b", "hello world!");
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
String separator = randomUnicodeOfCodepointLengthBetween(1, 10);
Processor processor = new DeDotProcessor(separator);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get("a" + separator + "b" ), equalTo("hello world!"));
}
public void testSimpleMap() throws Exception {
Map<String, Object> source = new HashMap<>();
Map<String, Object> subField = new HashMap<>();
subField.put("b.c", "hello world!");
source.put("a", subField);
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
Processor processor = new DeDotProcessor("_");
processor.execute(ingestDocument);
IngestDocument expectedDocument = new IngestDocument(
Collections.singletonMap("a", Collections.singletonMap("b_c", "hello world!")),
Collections.emptyMap());
assertThat(ingestDocument, equalTo(expectedDocument));
}
public void testSimpleList() throws Exception {
Map<String, Object> source = new HashMap<>();
Map<String, Object> subField = new HashMap<>();
subField.put("b.c", "hello world!");
source.put("a", Arrays.asList(subField));
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
Processor processor = new DeDotProcessor("_");
processor.execute(ingestDocument);
IngestDocument expectedDocument = new IngestDocument(
Collections.singletonMap("a",
Collections.singletonList(Collections.singletonMap("b_c", "hello world!"))),
Collections.emptyMap());
assertThat(ingestDocument, equalTo(expectedDocument));
}
}

View File

@ -463,6 +463,20 @@ to the requester.
}
--------------------------------------------------
==== DeDot Processor
The DeDot Processor is used to remove dots (".") from field names and
replace them with a specific `separator` string.
[source,js]
--------------------------------------------------
{
"dedot": {
"separator": "_"
}
}
--------------------------------------------------
=== Accessing data in pipelines
Processors in pipelines have read and write access to documents that pass through the pipeline.

View File

@ -0,0 +1,33 @@
---
"Test De-Dot Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"dedot" : {
"separator" : "3"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {"a.b.c": "hello world"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.a3b3c: "hello world" }