diff --git a/core/src/main/java/org/elasticsearch/ingest/core/IngestDocument.java b/core/src/main/java/org/elasticsearch/ingest/core/IngestDocument.java index b5c40e172af..c8f87faa53e 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/IngestDocument.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/IngestDocument.java @@ -463,7 +463,7 @@ public final class IngestDocument { /** * Returns the document including its metadata fields, unless {@link #extractMetadata()} has been called, in which case the - * metadata fields will not be present anymore. Should be used only for reading. + * metadata fields will not be present anymore. * Modify the document instead using {@link #setFieldValue(String, Object)} and {@link #removeField(String)} */ public Map getSourceAndMetadata() { diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/DeDotProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/DeDotProcessor.java new file mode 100644 index 00000000000..81349bac1cf --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/processor/DeDotProcessor.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.core.ConfigurationUtils; +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.Processor; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Processor that replaces dots in document field names with a + * specified separator. + */ +public class DeDotProcessor implements Processor { + + public static final String TYPE = "dedot"; + static final String DEFAULT_SEPARATOR = "_"; + + private final String separator; + + public DeDotProcessor(String separator) { + this.separator = separator; + } + + public String getSeparator() { + return separator; + } + + @Override + public void execute(IngestDocument document) { + deDot(document.getSourceAndMetadata()); + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Recursively iterates through Maps and Lists in search of map entries with + * keys containing dots. The dots in these fields are replaced with {@link #separator}. + * + * @param obj The current object in context to be checked for dots in its fields. + */ + private void deDot(Object obj) { + if (obj instanceof Map) { + @SuppressWarnings("unchecked") + Map doc = (Map) obj; + Iterator> it = doc.entrySet().iterator(); + Map deDottedFields = new HashMap<>(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + deDot(entry.getValue()); + String fieldName = entry.getKey(); + if (fieldName.contains(".")) { + String deDottedFieldName = fieldName.replaceAll("\\.", separator); + deDottedFields.put(deDottedFieldName, entry.getValue()); + it.remove(); + } + } + doc.putAll(deDottedFields); + } else if (obj instanceof List) { + @SuppressWarnings("unchecked") + List list = (List) obj; + list.forEach(this::deDot); + } + } + + public static class Factory implements Processor.Factory { + + @Override + public DeDotProcessor create(Map config) throws Exception { + String separator = ConfigurationUtils.readOptionalStringProperty(config, "separator"); + if (separator == null) { + separator = DEFAULT_SEPARATOR; + } + return new DeDotProcessor(separator); + } + } +} + diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 8ef26296fe2..1844c269754 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -31,6 +31,7 @@ import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.ingest.processor.AppendProcessor; import org.elasticsearch.ingest.processor.ConvertProcessor; import org.elasticsearch.ingest.processor.DateProcessor; +import org.elasticsearch.ingest.processor.DeDotProcessor; import org.elasticsearch.ingest.processor.FailProcessor; import org.elasticsearch.ingest.processor.GsubProcessor; import org.elasticsearch.ingest.processor.JoinProcessor; @@ -77,6 +78,7 @@ public class NodeModule extends AbstractModule { registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory()); registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory()); registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new); + registerProcessor(DeDotProcessor.TYPE, (templateService) -> new DeDotProcessor.Factory()); } @Override diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/DeDotProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/DeDotProcessorFactoryTests.java new file mode 100644 index 00000000000..620958b142c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/processor/DeDotProcessorFactoryTests.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class DeDotProcessorFactoryTests extends ESTestCase { + + private DeDotProcessor.Factory factory; + + @Before + public void init() { + factory = new DeDotProcessor.Factory(); + } + + public void testCreate() throws Exception { + Map config = new HashMap<>(); + config.put("separator", "_"); + DeDotProcessor deDotProcessor = factory.create(config); + assertThat(deDotProcessor.getSeparator(), equalTo("_")); + } + + public void testCreateMissingSeparatorField() throws Exception { + Map config = new HashMap<>(); + DeDotProcessor deDotProcessor = factory.create(config); + assertThat(deDotProcessor.getSeparator(), equalTo(DeDotProcessor.DEFAULT_SEPARATOR)); + } + +} diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/DeDotProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/DeDotProcessorTests.java new file mode 100644 index 00000000000..be6426ede36 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/processor/DeDotProcessorTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class DeDotProcessorTests extends ESTestCase { + + public void testSimple() throws Exception { + Map source = new HashMap<>(); + source.put("a.b", "hello world!"); + IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap()); + String separator = randomUnicodeOfCodepointLengthBetween(1, 10); + Processor processor = new DeDotProcessor(separator); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get("a" + separator + "b" ), equalTo("hello world!")); + } + + public void testSimpleMap() throws Exception { + Map source = new HashMap<>(); + Map subField = new HashMap<>(); + subField.put("b.c", "hello world!"); + source.put("a", subField); + IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap()); + Processor processor = new DeDotProcessor("_"); + processor.execute(ingestDocument); + + IngestDocument expectedDocument = new IngestDocument( + Collections.singletonMap("a", Collections.singletonMap("b_c", "hello world!")), + Collections.emptyMap()); + assertThat(ingestDocument, equalTo(expectedDocument)); + } + + public void testSimpleList() throws Exception { + Map source = new HashMap<>(); + Map subField = new HashMap<>(); + subField.put("b.c", "hello world!"); + source.put("a", Arrays.asList(subField)); + IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap()); + Processor processor = new DeDotProcessor("_"); + processor.execute(ingestDocument); + + IngestDocument expectedDocument = new IngestDocument( + Collections.singletonMap("a", + Collections.singletonList(Collections.singletonMap("b_c", "hello world!"))), + Collections.emptyMap()); + assertThat(ingestDocument, equalTo(expectedDocument)); + } +} diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc index ea42932e561..906479412e7 100644 --- a/docs/plugins/ingest.asciidoc +++ b/docs/plugins/ingest.asciidoc @@ -463,6 +463,20 @@ to the requester. } -------------------------------------------------- +==== DeDot Processor +The DeDot Processor is used to remove dots (".") from field names and +replace them with a specific `separator` string. + +[source,js] +-------------------------------------------------- +{ + "dedot": { + "separator": "_" + } +} +-------------------------------------------------- + + === Accessing data in pipelines Processors in pipelines have read and write access to documents that pass through the pipeline. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/80_dedot_processor.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/80_dedot_processor.yaml new file mode 100644 index 00000000000..8aedb4099d8 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/80_dedot_processor.yaml @@ -0,0 +1,33 @@ +--- +"Test De-Dot Processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "dedot" : { + "separator" : "3" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {"a.b.c": "hello world"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.a3b3c: "hello world" } +