From 7a6adfd93ad4e41663642124e29993e14b6f19df Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 4 Feb 2016 00:57:58 +0100 Subject: [PATCH] ingest: Added foreach processor. This processor is useful when all elements of a json array need to be processed in the same way. This avoids that a processor needs to be defined for each element in an array. Also it is very likely that it is unknown how many elements are inside an json array. --- .../ingest/SimulatePipelineRequest.java | 4 +- .../elasticsearch/ingest/IngestService.java | 8 +- .../elasticsearch/ingest/PipelineStore.java | 29 +-- .../ingest/ProcessorsRegistry.java | 66 +++++-- .../ingest/core/ConfigurationUtils.java | 35 +++- .../elasticsearch/ingest/core/Pipeline.java | 38 +--- .../ingest/processor/ForEachProcessor.java | 105 +++++++++++ .../org/elasticsearch/node/NodeModule.java | 40 +++-- .../node/service/NodeService.java | 5 +- .../SimulatePipelineRequestParsingTests.java | 9 +- .../elasticsearch/ingest/IngestClientIT.java | 2 +- .../ingest/PipelineStoreTests.java | 6 +- .../ingest/ProcessorsRegistryTests.java | 32 ++-- .../ingest/core/ConfigurationUtilsTests.java | 32 ++++ .../ingest/core/PipelineFactoryTests.java | 21 ++- .../ForEachProcessorFactoryTests.java | 70 ++++++++ .../processor/ForEachProcessorTests.java | 169 ++++++++++++++++++ docs/reference/ingest/ingest.asciidoc | 145 +++++++++++++++ .../ingest/grok/IngestGrokPlugin.java | 2 +- .../ingest/geoip/IngestGeoIpPlugin.java | 2 +- .../ingest/CombineProcessorsTests.java | 31 ++-- .../rest-api-spec/test/ingest/80_foreach.yaml | 41 +++++ 22 files changed, 748 insertions(+), 144 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java create mode 100644 core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorFactoryTests.java create mode 100644 core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/ingest/80_foreach.yaml diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 847de99f372..0a8cbdfe0c1 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -34,9 +34,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; -import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.ingest.core.IngestDocument.MetaData; public class SimulatePipelineRequest extends ActionRequest { @@ -140,7 +138,7 @@ public class SimulatePipelineRequest extends ActionRequest config, boolean verbose, PipelineStore pipelineStore) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); - Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); + Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorRegistry()); List ingestDocumentList = parseDocs(config); return new Parsed(pipeline, ingestDocumentList, verbose); } diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index 8af82b28a38..78a1f66fb80 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -33,10 +33,10 @@ public class IngestService implements Closeable { private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; - private final ProcessorsRegistry processorsRegistry; + private final ProcessorsRegistry.Builder processorsRegistryBuilder; - public IngestService(Settings settings, ThreadPool threadPool, ProcessorsRegistry processorsRegistry) { - this.processorsRegistry = processorsRegistry; + public IngestService(Settings settings, ThreadPool threadPool, ProcessorsRegistry.Builder processorsRegistryBuilder) { + this.processorsRegistryBuilder = processorsRegistryBuilder; this.pipelineStore = new PipelineStore(settings); this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); } @@ -50,7 +50,7 @@ public class IngestService implements Closeable { } public void setScriptService(ScriptService scriptService) { - pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, scriptService); + pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService); } @Override diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index e2d68199f43..3999f357b86 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -19,7 +19,6 @@ package org.elasticsearch.ingest; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -48,12 +47,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener { private final Pipeline.Factory factory = new Pipeline.Factory(); - private Map processorFactoryRegistry; + private ProcessorsRegistry processorRegistry; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some @@ -65,27 +63,16 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust super(settings); } - public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, ScriptService scriptService) { - Map processorFactories = new HashMap<>(); + public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService) { TemplateService templateService = new InternalTemplateService(scriptService); - for (Map.Entry>> entry : processorsRegistry.entrySet()) { - Processor.Factory processorFactory = entry.getValue().apply(templateService); - processorFactories.put(entry.getKey(), processorFactory); - } - this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories); + this.processorRegistry = processorsRegistryBuilder.build(templateService); } @Override public void close() throws IOException { // TODO: When org.elasticsearch.node.Node can close Closable instances we should try to remove this code, // since any wired closable should be able to close itself - List closeables = new ArrayList<>(); - for (Processor.Factory factory : processorFactoryRegistry.values()) { - if (factory instanceof Closeable) { - closeables.add((Closeable) factory); - } - } - IOUtils.close(closeables); + processorRegistry.close(); } @Override @@ -102,7 +89,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust Map pipelines = new HashMap<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { - pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactoryRegistry)); + pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorRegistry)); } catch (ElasticsearchParseException e) { throw e; } catch (Exception e) { @@ -156,7 +143,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust // validates the pipeline and processor configuration before submitting a cluster update task: Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2(); try { - factory.create(request.getId(), pipelineConfig, processorFactoryRegistry); + factory.create(request.getId(), pipelineConfig, processorRegistry); } catch(Exception e) { listener.onFailure(e); return; @@ -199,8 +186,8 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust return pipelines.get(id); } - public Map getProcessorFactoryRegistry() { - return processorFactoryRegistry; + public ProcessorsRegistry getProcessorRegistry() { + return processorRegistry; } /** diff --git a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java index 766ba772932..bd885c578b3 100644 --- a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java +++ b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java @@ -19,29 +19,69 @@ package org.elasticsearch.ingest; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.function.Function; +import java.util.function.BiFunction; -public class ProcessorsRegistry { +public final class ProcessorsRegistry implements Closeable { - private final Map>> processorFactoryProviders = new HashMap<>(); + private final Map processorFactories; - /** - * Adds a processor factory under a specific name. - */ - public void registerProcessor(String name, Function> processorFactoryProvider) { - Function> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider); - if (provider != null) { - throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]"); + private ProcessorsRegistry(TemplateService templateService, + Map>> providers) { + Map processorFactories = new HashMap<>(); + for (Map.Entry>> entry : providers.entrySet()) { + processorFactories.put(entry.getKey(), entry.getValue().apply(templateService, this)); } + this.processorFactories = Collections.unmodifiableMap(processorFactories); } - public Set>>> entrySet() { - return processorFactoryProviders.entrySet(); + public Processor.Factory getProcessorFactory(String name) { + return processorFactories.get(name); + } + + @Override + public void close() throws IOException { + List closeables = new ArrayList<>(); + for (Processor.Factory factory : processorFactories.values()) { + if (factory instanceof Closeable) { + closeables.add((Closeable) factory); + } + } + IOUtils.close(closeables); + } + + // For testing: + Map getProcessorFactories() { + return processorFactories; + } + + public static final class Builder { + + private final Map>> providers = new HashMap<>(); + + /** + * Adds a processor factory under a specific name. + */ + public void registerProcessor(String name, BiFunction> provider) { + BiFunction> previous = this.providers.putIfAbsent(name, provider); + if (previous != null) { + throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]"); + } + } + + public ProcessorsRegistry build(TemplateService templateService) { + return new ProcessorsRegistry(templateService, providers); + } + } } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java index bd3fd8cfb6e..3e99b3db816 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java @@ -19,9 +19,12 @@ package org.elasticsearch.ingest.core; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ingest.ProcessorsRegistry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -178,4 +181,34 @@ public final class ConfigurationUtils { } return exception; } + + public static List readProcessorConfigs(List>> processorConfigs, ProcessorsRegistry processorRegistry) throws Exception { + List processors = new ArrayList<>(); + if (processorConfigs != null) { + for (Map> processorConfigWithKey : processorConfigs) { + for (Map.Entry> entry : processorConfigWithKey.entrySet()) { + processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue())); + } + } + } + return processors; + } + + private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map config) throws Exception { + Processor.Factory factory = processorRegistry.getProcessorFactory(type); + if (factory != null) { + List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); + List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); + Processor processor; + processor = factory.create(config); + if (!config.isEmpty()) { + throw new ElasticsearchParseException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + } + if (onFailureProcessors.isEmpty()) { + return processor; + } + return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); + } + throw new ElasticsearchParseException("No processor type exists with name [" + type + "]"); + } } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java index 1c560fa6bcc..9b887ec229c 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java @@ -20,8 +20,8 @@ package org.elasticsearch.ingest.core; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ingest.ProcessorsRegistry; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -85,12 +85,12 @@ public final class Pipeline { public final static class Factory { - public Pipeline create(String id, Map config, Map processorRegistry) throws Exception { + public Pipeline create(String id, Map config, ProcessorsRegistry processorRegistry) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); List>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = readProcessorConfigs(processorConfigs, processorRegistry); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry); List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); + List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); if (config.isEmpty() == false) { throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); } @@ -98,35 +98,5 @@ public final class Pipeline { return new Pipeline(id, description, compoundProcessor); } - private List readProcessorConfigs(List>> processorConfigs, Map processorRegistry) throws Exception { - List processors = new ArrayList<>(); - if (processorConfigs != null) { - for (Map> processorConfigWithKey : processorConfigs) { - for (Map.Entry> entry : processorConfigWithKey.entrySet()) { - processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue())); - } - } - } - - return processors; - } - - private Processor readProcessor(Map processorRegistry, String type, Map config) throws Exception { - Processor.Factory factory = processorRegistry.get(type); - if (factory != null) { - List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); - Processor processor; - processor = factory.create(config); - if (!config.isEmpty()) { - throw new ElasticsearchParseException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); - } - if (onFailureProcessors.isEmpty()) { - return processor; - } - return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); - } - throw new ElasticsearchParseException("No processor type exists with name [" + type + "]"); - } } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java new file mode 100644 index 00000000000..5b101fbfb32 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/processor/ForEachProcessor.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.core.AbstractProcessor; +import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.ConfigurationUtils; +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.Processor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.ingest.core.ConfigurationUtils.readList; +import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty; + +/** + * A processor that for each value in a list executes a one or more processors. + * + * This can be useful in cases to do string operations on json array of strings, + * or remove a field from objects inside a json array. + */ +public final class ForEachProcessor extends AbstractProcessor { + + public static final String TYPE = "foreach"; + + private final String field; + private final List processors; + + ForEachProcessor(String tag, String field, List processors) { + super(tag); + this.field = field; + this.processors = processors; + } + + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + List values = ingestDocument.getFieldValue(field, List.class); + List newValues = new ArrayList<>(values.size()); + for (Object value : values) { + Map innerSource = new HashMap<>(); + innerSource.put("_value", value); + for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { + innerSource.put(metaData.getFieldName(), ingestDocument.getSourceAndMetadata().get(metaData.getFieldName())); + } + IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata()); + for (Processor processor : processors) { + processor.execute(innerIngestDocument); + } + newValues.add(innerSource.get("_value")); + } + ingestDocument.setFieldValue(field, newValues); + } + + @Override + public String getType() { + return TYPE; + } + + String getField() { + return field; + } + + List getProcessors() { + return processors; + } + + public static final class Factory extends AbstractProcessorFactory { + + private final ProcessorsRegistry processorRegistry; + + public Factory(ProcessorsRegistry processorRegistry) { + this.processorRegistry = processorRegistry; + } + + @Override + protected ForEachProcessor doCreate(String tag, Map config) throws Exception { + String field = readStringProperty(TYPE, tag, config, "field"); + List>> processorConfigs = readList(TYPE, tag, config, "processors"); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry); + return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors)); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 365e260ebf0..935b240b3d9 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -29,6 +29,7 @@ import org.elasticsearch.ingest.processor.AppendProcessor; import org.elasticsearch.ingest.processor.ConvertProcessor; import org.elasticsearch.ingest.processor.DateProcessor; import org.elasticsearch.ingest.processor.FailProcessor; +import org.elasticsearch.ingest.processor.ForEachProcessor; import org.elasticsearch.ingest.processor.GsubProcessor; import org.elasticsearch.ingest.processor.JoinProcessor; import org.elasticsearch.ingest.processor.LowercaseProcessor; @@ -41,7 +42,7 @@ import org.elasticsearch.ingest.processor.UppercaseProcessor; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.service.NodeService; -import java.util.function.Function; +import java.util.function.BiFunction; /** * @@ -50,7 +51,7 @@ public class NodeModule extends AbstractModule { private final Node node; private final MonitorService monitorService; - private final ProcessorsRegistry processorsRegistry; + private final ProcessorsRegistry.Builder processorsRegistryBuilder; // pkg private so tests can mock Class pageCacheRecyclerImpl = PageCacheRecycler.class; @@ -59,21 +60,22 @@ public class NodeModule extends AbstractModule { public NodeModule(Node node, MonitorService monitorService) { this.node = node; this.monitorService = monitorService; - this.processorsRegistry = new ProcessorsRegistry(); + this.processorsRegistryBuilder = new ProcessorsRegistry.Builder(); - registerProcessor(DateProcessor.TYPE, (templateService) -> new DateProcessor.Factory()); - registerProcessor(SetProcessor.TYPE, SetProcessor.Factory::new); - registerProcessor(AppendProcessor.TYPE, AppendProcessor.Factory::new); - registerProcessor(RenameProcessor.TYPE, (templateService) -> new RenameProcessor.Factory()); - registerProcessor(RemoveProcessor.TYPE, RemoveProcessor.Factory::new); - registerProcessor(SplitProcessor.TYPE, (templateService) -> new SplitProcessor.Factory()); - registerProcessor(JoinProcessor.TYPE, (templateService) -> new JoinProcessor.Factory()); - registerProcessor(UppercaseProcessor.TYPE, (templateService) -> new UppercaseProcessor.Factory()); - registerProcessor(LowercaseProcessor.TYPE, (templateService) -> new LowercaseProcessor.Factory()); - registerProcessor(TrimProcessor.TYPE, (templateService) -> new TrimProcessor.Factory()); - registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory()); - registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory()); - registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new); + registerProcessor(DateProcessor.TYPE, (templateService, registry) -> new DateProcessor.Factory()); + registerProcessor(SetProcessor.TYPE, (templateService, registry) -> new SetProcessor.Factory(templateService)); + registerProcessor(AppendProcessor.TYPE, (templateService, registry) -> new AppendProcessor.Factory(templateService)); + registerProcessor(RenameProcessor.TYPE, (templateService, registry) -> new RenameProcessor.Factory()); + registerProcessor(RemoveProcessor.TYPE, (templateService, registry) -> new RemoveProcessor.Factory(templateService)); + registerProcessor(SplitProcessor.TYPE, (templateService, registry) -> new SplitProcessor.Factory()); + registerProcessor(JoinProcessor.TYPE, (templateService, registry) -> new JoinProcessor.Factory()); + registerProcessor(UppercaseProcessor.TYPE, (templateService, registry) -> new UppercaseProcessor.Factory()); + registerProcessor(LowercaseProcessor.TYPE, (templateService, registry) -> new LowercaseProcessor.Factory()); + registerProcessor(TrimProcessor.TYPE, (templateService, registry) -> new TrimProcessor.Factory()); + registerProcessor(ConvertProcessor.TYPE, (templateService, registry) -> new ConvertProcessor.Factory()); + registerProcessor(GsubProcessor.TYPE, (templateService, registry) -> new GsubProcessor.Factory()); + registerProcessor(FailProcessor.TYPE, (templateService, registry) -> new FailProcessor.Factory(templateService)); + registerProcessor(ForEachProcessor.TYPE, (templateService, registry) -> new ForEachProcessor.Factory(registry)); } @Override @@ -92,7 +94,7 @@ public class NodeModule extends AbstractModule { bind(Node.class).toInstance(node); bind(MonitorService.class).toInstance(monitorService); bind(NodeService.class).asEagerSingleton(); - bind(ProcessorsRegistry.class).toInstance(processorsRegistry); + bind(ProcessorsRegistry.Builder.class).toInstance(processorsRegistryBuilder); } /** @@ -105,7 +107,7 @@ public class NodeModule extends AbstractModule { /** * Adds a processor factory under a specific type name. */ - public void registerProcessor(String type, Function> processorFactoryProvider) { - processorsRegistry.registerProcessor(type, processorFactoryProvider); + public void registerProcessor(String type, BiFunction> provider) { + processorsRegistryBuilder.registerProcessor(type, provider); } } diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/service/NodeService.java index 0f45fdbe549..b5b8e8f2cb6 100644 --- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.env.Environment; import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -78,7 +77,7 @@ public class NodeService extends AbstractComponent implements Closeable { public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, TransportService transportService, IndicesService indicesService, PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version, - ProcessorsRegistry processorsRegistry, ClusterService clusterService, SettingsFilter settingsFilter) { + ProcessorsRegistry.Builder processorsRegistryBuilder, ClusterService clusterService, SettingsFilter settingsFilter) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -89,7 +88,7 @@ public class NodeService extends AbstractComponent implements Closeable { this.version = version; this.pluginService = pluginService; this.circuitBreakerService = circuitBreakerService; - this.ingestService = new IngestService(settings, threadPool, processorsRegistry); + this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder); this.settingsFilter = settingsFilter; clusterService.add(ingestService.getPipelineStore()); } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index c0e7d6921ac..de0a28fd7e1 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -20,7 +20,9 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.core.CompoundProcessor; import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.Pipeline; @@ -54,11 +56,12 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { TestProcessor processor = new TestProcessor(ingestDocument -> {}); CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor); Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor); - Map processorRegistry = new HashMap<>(); - processorRegistry.put("mock_processor", mock(Processor.Factory.class)); + ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder(); + processorRegistryBuilder.registerProcessor("mock_processor", ((templateService, registry) -> mock(Processor.Factory.class))); + ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(TestTemplateService.instance()); store = mock(PipelineStore.class); when(store.get(SimulatePipelineRequest.SIMULATED_PIPELINE_ID)).thenReturn(pipeline); - when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry); + when(store.getProcessorRegistry()).thenReturn(processorRegistry); } public void testParseUsingPipelineStore() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index e5fcba2a3be..f0d12158752 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -244,7 +244,7 @@ public class IngestClientIT extends ESIntegTestCase { } public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor("test", templateService -> config -> + nodeModule.registerProcessor("test", (templateService, registry) -> config -> new TestProcessor("id", "test", ingestDocument -> { ingestDocument.setFieldValue("processed", true); if (ingestDocument.getFieldValue("fail", Boolean.class)) { diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index bdf1f7d49c1..fb0605f90b5 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -50,9 +50,9 @@ public class PipelineStoreTests extends ESTestCase { @Before public void init() throws Exception { store = new PipelineStore(Settings.EMPTY); - ProcessorsRegistry registry = new ProcessorsRegistry(); - registry.registerProcessor("set", (templateService) -> new SetProcessor.Factory(TestTemplateService.instance())); - store.buildProcessorFactoryRegistry(registry, null); + ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder(); + registryBuilder.registerProcessor("set", (templateService, registry) -> new SetProcessor.Factory(TestTemplateService.instance())); + store.buildProcessorFactoryRegistry(registryBuilder, null); } public void testUpdatePipelines() { diff --git a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java b/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java index ad18488d990..26edb54e6fa 100644 --- a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java @@ -19,42 +19,30 @@ package org.elasticsearch.ingest; -import org.elasticsearch.ingest.core.Processor; -import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.test.ESTestCase; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; - import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; public class ProcessorsRegistryTests extends ESTestCase { - public void testAddProcessor() { - ProcessorsRegistry processorsRegistry = new ProcessorsRegistry(); + public void testBuildProcessorRegistry() { + ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); TestProcessor.Factory factory1 = new TestProcessor.Factory(); - processorsRegistry.registerProcessor("1", (templateService) -> factory1); + builder.registerProcessor("1", (templateService, registry) -> factory1); TestProcessor.Factory factory2 = new TestProcessor.Factory(); - processorsRegistry.registerProcessor("2", (templateService) -> factory2); + builder.registerProcessor("2", (templateService, registry) -> factory2); TestProcessor.Factory factory3 = new TestProcessor.Factory(); try { - processorsRegistry.registerProcessor("1", (templateService) -> factory3); + builder.registerProcessor("1", (templateService, registry) -> factory3); fail("addProcessor should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]")); } - Set>>> entrySet = processorsRegistry.entrySet(); - assertThat(entrySet.size(), equalTo(2)); - for (Map.Entry>> entry : entrySet) { - if (entry.getKey().equals("1")) { - assertThat(entry.getValue().apply(null), equalTo(factory1)); - } else if (entry.getKey().equals("2")) { - assertThat(entry.getValue().apply(null), equalTo(factory2)); - } else { - fail("unexpected processor id [" + entry.getKey() + "]"); - } - } + ProcessorsRegistry registry = builder.build(TestTemplateService.instance()); + assertThat(registry.getProcessorFactories().size(), equalTo(2)); + assertThat(registry.getProcessorFactory("1"), sameInstance(factory1)); + assertThat(registry.getProcessorFactory("2"), sameInstance(factory2)); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java b/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java index 722f14e396e..35765b41159 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.core; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -31,6 +33,9 @@ import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; public class ConfigurationUtilsTests extends ESTestCase { @@ -68,4 +73,31 @@ public class ConfigurationUtilsTests extends ESTestCase { List val = ConfigurationUtils.readList(null, null, config, "int"); assertThat(val, equalTo(Collections.singletonList(2))); } + + public void testReadProcessors() throws Exception { + Processor processor = mock(Processor.class); + ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); + builder.registerProcessor("test_processor", (templateService, registry) -> config -> processor); + ProcessorsRegistry registry = builder.build(TestTemplateService.instance()); + + + List>> config = new ArrayList<>(); + Map emptyConfig = Collections.emptyMap(); + config.add(Collections.singletonMap("test_processor", emptyConfig)); + config.add(Collections.singletonMap("test_processor", emptyConfig)); + + List result = ConfigurationUtils.readProcessorConfigs(config, registry); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0), sameInstance(processor)); + assertThat(result.get(1), sameInstance(processor)); + + config.add(Collections.singletonMap("unknown_processor", emptyConfig)); + try { + ConfigurationUtils.readProcessorConfigs(config, registry); + fail("exception expected"); + } catch (ElasticsearchParseException e) { + assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]")); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java index 04f887e9383..fdf48ff4281 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java @@ -20,13 +20,16 @@ package org.elasticsearch.ingest.core; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.prefs.PreferencesFactory; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -41,7 +44,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); + ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); @@ -57,7 +60,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); Pipeline.Factory factory = new Pipeline.Factory(); try { - factory.create("_id", pipelineConfig, Collections.emptyMap()); + factory.create("_id", pipelineConfig, createProcessorRegistry(Collections.emptyMap())); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -71,7 +74,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); + ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); @@ -88,7 +91,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); + ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); try { factory.create("_id", pipelineConfig, processorRegistry); } catch (ElasticsearchParseException e) { @@ -104,11 +107,19 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); + ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getProcessors().size(), equalTo(1)); assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound")); } + + private ProcessorsRegistry createProcessorRegistry(Map processorRegistry) { + ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); + for (Map.Entry entry : processorRegistry.entrySet()) { + builder.registerProcessor(entry.getKey(), ((templateService, registry) -> entry.getValue())); + } + return builder.build(TestTemplateService.instance()); + } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorFactoryTests.java new file mode 100644 index 00000000000..d03c2c2c546 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorFactoryTests.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.ingest.TestTemplateService; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class ForEachProcessorFactoryTests extends ESTestCase { + + public void testCreate() throws Exception { + ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); + Processor processor = new TestProcessor(ingestDocument -> {}); + builder.registerProcessor("_name", (templateService, registry) -> config -> processor); + ProcessorsRegistry registry = builder.build(TestTemplateService.instance()); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(registry); + + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); + ForEachProcessor forEachProcessor = forEachFactory.create(config); + assertThat(forEachProcessor, Matchers.notNullValue()); + assertThat(forEachProcessor.getField(), Matchers.equalTo("_field")); + assertThat(forEachProcessor.getProcessors().size(), Matchers.equalTo(1)); + assertThat(forEachProcessor.getProcessors().get(0), Matchers.sameInstance(processor)); + + config = new HashMap<>(); + config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); + try { + forEachFactory.create(config); + fail("exception expected"); + } catch (Exception e) { + assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing")); + } + + config = new HashMap<>(); + config.put("field", "_field"); + try { + forEachFactory.create(config); + fail("exception expected"); + } catch (Exception e) { + assertThat(e.getMessage(), Matchers.equalTo("[processors] required property is missing")); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java new file mode 100644 index 00000000000..2ef911e270c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/processor/ForEachProcessorTests.java @@ -0,0 +1,169 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.ingest.core.CompoundProcessor; +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class ForEachProcessorTests extends ESTestCase { + + public void testExecute() throws Exception { + List values = new ArrayList<>(); + values.add("foo"); + values.add("bar"); + values.add("baz"); + IngestDocument ingestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) + ); + + ForEachProcessor processor = new ForEachProcessor( + "_tag", "values", Collections.singletonList(new UppercaseProcessor("_tag", "_value")) + ); + processor.execute(ingestDocument); + + List result = ingestDocument.getFieldValue("values", List.class); + assertThat(result.get(0), equalTo("FOO")); + assertThat(result.get(1), equalTo("BAR")); + assertThat(result.get(2), equalTo("BAZ")); + } + + public void testExecuteWithFailure() throws Exception { + IngestDocument ingestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c")) + ); + + TestProcessor testProcessor = new TestProcessor(id -> { + if ("c".equals(id.getFieldValue("_value", String.class))) { + throw new RuntimeException("failure"); + } + }); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(testProcessor)); + try { + processor.execute(ingestDocument); + fail("exception expected"); + } catch (RuntimeException e) { + assertThat(e.getMessage(), equalTo("failure")); + } + assertThat(testProcessor.getInvokedCounter(), equalTo(3)); + assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c"))); + + testProcessor = new TestProcessor(id -> { + String value = id.getFieldValue("_value", String.class); + if ("c".equals(value)) { + throw new RuntimeException("failure"); + } else { + id.setFieldValue("_value", value.toUpperCase(Locale.ROOT)); + } + }); + Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); + processor = new ForEachProcessor( + "_tag", "values", + Collections.singletonList(new CompoundProcessor(Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))) + ); + processor.execute(ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(3)); + assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c"))); + } + + public void testMetaDataAvailable() throws Exception { + List> values = new ArrayList<>(); + values.add(new HashMap<>()); + values.add(new HashMap<>()); + IngestDocument ingestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) + ); + + TestProcessor innerProcessor = new TestProcessor(id -> { + id.setFieldValue("_value.index", id.getSourceAndMetadata().get("_index")); + id.setFieldValue("_value.type", id.getSourceAndMetadata().get("_type")); + id.setFieldValue("_value.id", id.getSourceAndMetadata().get("_id")); + }); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(innerProcessor)); + processor.execute(ingestDocument); + + assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); + assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index")); + assertThat(ingestDocument.getFieldValue("values.0.type", String.class), equalTo("_type")); + assertThat(ingestDocument.getFieldValue("values.0.id", String.class), equalTo("_id")); + assertThat(ingestDocument.getFieldValue("values.1.index", String.class), equalTo("_index")); + assertThat(ingestDocument.getFieldValue("values.1.type", String.class), equalTo("_type")); + assertThat(ingestDocument.getFieldValue("values.1.id", String.class), equalTo("_id")); + } + + public void testRandom() throws Exception { + int numProcessors = randomInt(8); + List processors = new ArrayList<>(numProcessors); + for (int i = 0; i < numProcessors; i++) { + processors.add(new Processor() { + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + String existingValue = ingestDocument.getFieldValue("_value", String.class); + ingestDocument.setFieldValue("_value", existingValue + "."); + } + + @Override + public String getType() { + return null; + } + + @Override + public String getTag() { + return null; + } + }); + } + int numValues = randomIntBetween(1, 32); + List values = new ArrayList<>(numValues); + for (int i = 0; i < numValues; i++) { + values.add(""); + } + IngestDocument ingestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) + ); + + ForEachProcessor processor = new ForEachProcessor("_tag", "values", processors); + processor.execute(ingestDocument); + List result = ingestDocument.getFieldValue("values", List.class); + assertThat(result.size(), equalTo(numValues)); + + String expectedString = ""; + for (int i = 0; i < numProcessors; i++) { + expectedString = expectedString + "."; + } + + for (String r : result) { + assertThat(r, equalTo(expectedString)); + } + } + +} diff --git a/docs/reference/ingest/ingest.asciidoc b/docs/reference/ingest/ingest.asciidoc index e9226e7d537..7df3bebb006 100644 --- a/docs/reference/ingest/ingest.asciidoc +++ b/docs/reference/ingest/ingest.asciidoc @@ -534,6 +534,151 @@ to the requester. } -------------------------------------------------- +==== Foreach processor +All processors can operate on elements inside an array, but if all elements of an array need to +be processed in the same way defining a processor for each element becomes cumbersome and tricky +because it is likely that the number of elements in an array are unknown. For this reason the `foreach` +processor is exists. By specifying the field holding array elements and a list of processors that +define what should happen to each element, array field can easily be preprocessed. + +Processors inside the foreach processor work in a different context and the only valid top level +field is `_value`, which holds the array element value. Under this field other fields may exist. + +If the `foreach` processor failed to process an element inside the array and no `on_failure` processor has been specified +then it aborts the execution and leaves the array unmodified. + +[[foreach-options]] +.Foreach Options +[options="header"] +|====== +| Name | Required | Default | Description +| `field` | yes | - | The array field +| `processors` | yes | - | The processors +|====== + +Assume the following document: + +[source,js] +-------------------------------------------------- +{ + "value" : ["foo", "bar", "baz"] +} +-------------------------------------------------- + +When this `foreach` processor operates on this sample document: + +[source,js] +-------------------------------------------------- +{ + "foreach" : { + "field" : "values", + "processors" : [ + { + "uppercase" : { + "field" : "_value" + } + } + ] + } +} +-------------------------------------------------- + +Then the document will look like this after preprocessing: + +[source,js] +-------------------------------------------------- +{ + "value" : ["FOO", "BAR", "BAZ"] +} +-------------------------------------------------- + +Lets take a look at another example: + +[source,js] +-------------------------------------------------- +{ + "persons" : [ + { + "id" : "1", + "name" : "John Doe" + }, + { + "id" : "2", + "name" : "Jane Doe" + } + ] +} +-------------------------------------------------- + +and in the case the `id` field needs to be removed +then the following `foreach` processor can be used: + +[source,js] +-------------------------------------------------- +{ + "foreach" : { + "field" : "persons", + "processors" : [ + { + "remove" : { + "field" : "_value.id" + } + } + ] + } +} +-------------------------------------------------- + +After preprocessing the result is: + +[source,js] +-------------------------------------------------- +{ + "persons" : [ + { + "name" : "John Doe" + }, + { + "name" : "Jane Doe" + } + ] +} +-------------------------------------------------- + +Like on any processor `on_failure` processors can also be defined +in processors that wrapped inside the `foreach` processor. + +For example the `id` field may not exist on all person objects and +instead of failing the index request, the document will be send to +the 'failure_index' index for later inspection: + +[source,js] +-------------------------------------------------- +{ + "foreach" : { + "field" : "persons", + "processors" : [ + { + "remove" : { + "field" : "_value.id", + "on_failure" : [ + { + "set" : { + "field", "_index", + "value", "failure_index" + } + } + ] + } + } + ] + } +} +-------------------------------------------------- + +In this example if the `remove` processor does fail then +the array elements that have been processed thus far will +be updated. === Accessing data in pipelines diff --git a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java index 9ccccadbff3..998c536b474 100644 --- a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java +++ b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java @@ -56,7 +56,7 @@ public class IngestGrokPlugin extends Plugin { } public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService) -> new GrokProcessor.Factory(builtinPatterns)); + nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService, registry) -> new GrokProcessor.Factory(builtinPatterns)); } public static Map loadBuiltinPatterns() throws IOException { diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 570b1e2d18f..7c57349f4b6 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -50,7 +50,7 @@ public class IngestGeoIpPlugin extends Plugin { public void onModule(NodeModule nodeModule) throws IOException { Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip"); Map databaseReaders = loadDatabaseReaders(geoIpConfigDirectory); - nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService) -> new GeoIpProcessor.Factory(databaseReaders)); + nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService, registry) -> new GeoIpProcessor.Factory(databaseReaders)); } public static Map loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException { diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/CombineProcessorsTests.java b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/CombineProcessorsTests.java index 0245233a159..ecf1b0297c7 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/CombineProcessorsTests.java +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/CombineProcessorsTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.ingest.grok.IngestGrokPlugin; import org.elasticsearch.ingest.processor.AppendProcessor; import org.elasticsearch.ingest.processor.ConvertProcessor; import org.elasticsearch.ingest.processor.DateProcessor; +import org.elasticsearch.ingest.processor.ForEachProcessor; import org.elasticsearch.ingest.processor.LowercaseProcessor; import org.elasticsearch.ingest.processor.RemoveProcessor; import org.elasticsearch.ingest.processor.RenameProcessor; @@ -49,6 +50,7 @@ import java.io.ByteArrayInputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -155,10 +157,17 @@ public class CombineProcessorsTests extends ESTestCase { @SuppressWarnings("unchecked") public void testMutate() throws Exception { + ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); + builder.registerProcessor("remove", (templateService, registry) -> new RemoveProcessor.Factory(templateService)); + builder.registerProcessor("trim", (templateService, registry) -> new TrimProcessor.Factory()); + ProcessorsRegistry registry = builder.build(TestTemplateService.instance()); + Map config = new HashMap<>(); - // TODO: when we add foreach processor we should delete all friends.id fields - config.put("field", "friends.0.id"); - RemoveProcessor processor1 = new RemoveProcessor.Factory(TestTemplateService.instance()).create(config); + config.put("field", "friends"); + Map removeConfig = new HashMap<>(); + removeConfig.put("field", "_value.id"); + config.put("processors", Collections.singletonList(Collections.singletonMap("remove", removeConfig))); + ForEachProcessor processor1 = new ForEachProcessor.Factory(registry).create(config); config = new HashMap<>(); config.put("field", "tags"); config.put("value", "new_value"); @@ -168,9 +177,11 @@ public class CombineProcessorsTests extends ESTestCase { config.put("separator", ","); SplitProcessor processor3 = new SplitProcessor.Factory().create(config); config = new HashMap<>(); - // TODO: when we add foreach processor, then change the test to trim all address values - config.put("field", "address.1"); - TrimProcessor processor4 = new TrimProcessor.Factory().create(config); + config.put("field", "address"); + Map trimConfig = new HashMap<>(); + trimConfig.put("field", "_value"); + config.put("processors", Collections.singletonList(Collections.singletonMap("trim", trimConfig))); + ForEachProcessor processor4 = new ForEachProcessor.Factory(registry).create(config); config = new HashMap<>(); config.put("field", "company"); LowercaseProcessor processor5 = new LowercaseProcessor.Factory().create(config); @@ -190,16 +201,16 @@ public class CombineProcessorsTests extends ESTestCase { pipeline.execute(document); assertThat(((List>) document.getSourceAndMetadata().get("friends")).get(0).get("id"), nullValue()); - assertThat(((List>) document.getSourceAndMetadata().get("friends")).get(1).get("id"), equalTo(1)); - assertThat(((List>) document.getSourceAndMetadata().get("friends")).get(2).get("id"), equalTo(2)); + assertThat(((List>) document.getSourceAndMetadata().get("friends")).get(1).get("id"), nullValue()); + assertThat(((List>) document.getSourceAndMetadata().get("friends")).get(2).get("id"), nullValue()); assertThat(document.getFieldValue("tags.7", String.class), equalTo("new_value")); List addressDetails = document.getFieldValue("address", List.class); assertThat(addressDetails.size(), equalTo(4)); assertThat(addressDetails.get(0), equalTo("713 Bartlett Place")); assertThat(addressDetails.get(1), equalTo("Accoville")); - assertThat(addressDetails.get(2), equalTo(" Puerto Rico")); - assertThat(addressDetails.get(3), equalTo(" 9221")); + assertThat(addressDetails.get(2), equalTo("Puerto Rico")); + assertThat(addressDetails.get(3), equalTo("9221")); assertThat(document.getSourceAndMetadata().get("company"), equalTo("atgen")); assertThat(document.getSourceAndMetadata().get("gender"), equalTo("MALE")); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/80_foreach.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/80_foreach.yaml new file mode 100644 index 00000000000..09ef359a8c9 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/80_foreach.yaml @@ -0,0 +1,41 @@ +--- +"Test foreach Processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "foreach" : { + "field" : "values", + "processors" : [ + { + "uppercase" : { + "field" : "_value" + } + } + ] + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: { + values: ["foo", "bar", "baz"] + } + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.values: ["FOO", "BAR", "BAZ"] }