ingest: Added foreach processor.

This processor is useful when all elements of a json array need to be processed in the same way.
This avoids that a processor needs to be defined for each element in an array.
Also it is very likely that it is unknown how many elements are inside an json array.
This commit is contained in:
Martijn van Groningen 2016-02-04 00:57:58 +01:00
parent 79ebe0682c
commit 7a6adfd93a
22 changed files with 748 additions and 144 deletions

View File

@ -34,9 +34,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData;
public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineRequest> {
@ -140,7 +138,7 @@ public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineReque
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorRegistry());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

View File

@ -33,10 +33,10 @@ public class IngestService implements Closeable {
private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;
private final ProcessorsRegistry processorsRegistry;
private final ProcessorsRegistry.Builder processorsRegistryBuilder;
public IngestService(Settings settings, ThreadPool threadPool, ProcessorsRegistry processorsRegistry) {
this.processorsRegistry = processorsRegistry;
public IngestService(Settings settings, ThreadPool threadPool, ProcessorsRegistry.Builder processorsRegistryBuilder) {
this.processorsRegistryBuilder = processorsRegistryBuilder;
this.pipelineStore = new PipelineStore(settings);
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
}
@ -50,7 +50,7 @@ public class IngestService implements Closeable {
}
public void setScriptService(ScriptService scriptService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, scriptService);
pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService);
}
@Override

View File

@ -19,7 +19,6 @@
package org.elasticsearch.ingest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
@ -48,12 +47,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener {
private final Pipeline.Factory factory = new Pipeline.Factory();
private Map<String, Processor.Factory> processorFactoryRegistry;
private ProcessorsRegistry processorRegistry;
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
@ -65,27 +63,16 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
super(settings);
}
public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, ScriptService scriptService) {
Map<String, Processor.Factory> processorFactories = new HashMap<>();
public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService) {
TemplateService templateService = new InternalTemplateService(scriptService);
for (Map.Entry<String, Function<TemplateService, Processor.Factory<?>>> entry : processorsRegistry.entrySet()) {
Processor.Factory processorFactory = entry.getValue().apply(templateService);
processorFactories.put(entry.getKey(), processorFactory);
}
this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);
this.processorRegistry = processorsRegistryBuilder.build(templateService);
}
@Override
public void close() throws IOException {
// TODO: When org.elasticsearch.node.Node can close Closable instances we should try to remove this code,
// since any wired closable should be able to close itself
List<Closeable> closeables = new ArrayList<>();
for (Processor.Factory factory : processorFactoryRegistry.values()) {
if (factory instanceof Closeable) {
closeables.add((Closeable) factory);
}
}
IOUtils.close(closeables);
processorRegistry.close();
}
@Override
@ -102,7 +89,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
Map<String, Pipeline> pipelines = new HashMap<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactoryRegistry));
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorRegistry));
} catch (ElasticsearchParseException e) {
throw e;
} catch (Exception e) {
@ -156,7 +143,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
// validates the pipeline and processor configuration before submitting a cluster update task:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
try {
factory.create(request.getId(), pipelineConfig, processorFactoryRegistry);
factory.create(request.getId(), pipelineConfig, processorRegistry);
} catch(Exception e) {
listener.onFailure(e);
return;
@ -199,8 +186,8 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
return pipelines.get(id);
}
public Map<String, Processor.Factory> getProcessorFactoryRegistry() {
return processorFactoryRegistry;
public ProcessorsRegistry getProcessorRegistry() {
return processorRegistry;
}
/**

View File

@ -19,29 +19,69 @@
package org.elasticsearch.ingest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.BiFunction;
public class ProcessorsRegistry {
public final class ProcessorsRegistry implements Closeable {
private final Map<String, Function<TemplateService, Processor.Factory<?>>> processorFactoryProviders = new HashMap<>();
private final Map<String, Processor.Factory> processorFactories;
/**
* Adds a processor factory under a specific name.
*/
public void registerProcessor(String name, Function<TemplateService, Processor.Factory<?>> processorFactoryProvider) {
Function<TemplateService, Processor.Factory<?>> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider);
if (provider != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
private ProcessorsRegistry(TemplateService templateService,
Map<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> providers) {
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (Map.Entry<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(templateService, this));
}
this.processorFactories = Collections.unmodifiableMap(processorFactories);
}
public Set<Map.Entry<String, Function<TemplateService, Processor.Factory<?>>>> entrySet() {
return processorFactoryProviders.entrySet();
public Processor.Factory getProcessorFactory(String name) {
return processorFactories.get(name);
}
@Override
public void close() throws IOException {
List<Closeable> closeables = new ArrayList<>();
for (Processor.Factory factory : processorFactories.values()) {
if (factory instanceof Closeable) {
closeables.add((Closeable) factory);
}
}
IOUtils.close(closeables);
}
// For testing:
Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}
public static final class Builder {
private final Map<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> providers = new HashMap<>();
/**
* Adds a processor factory under a specific name.
*/
public void registerProcessor(String name, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> provider) {
BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> previous = this.providers.putIfAbsent(name, provider);
if (previous != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
}
}
public ProcessorsRegistry build(TemplateService templateService) {
return new ProcessorsRegistry(templateService, providers);
}
}
}

View File

@ -19,9 +19,12 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.ProcessorsRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -178,4 +181,34 @@ public final class ConfigurationUtils {
}
return exception;
}
public static List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs, ProcessorsRegistry processorRegistry) throws Exception {
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue()));
}
}
}
return processors;
}
private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorRegistry.getProcessorFactory(type);
if (factory != null) {
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
Processor processor;
processor = factory.create(config);
if (!config.isEmpty()) {
throw new ElasticsearchParseException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessors.isEmpty()) {
return processor;
}
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
}
throw new ElasticsearchParseException("No processor type exists with name [" + type + "]");
}
}

View File

@ -20,8 +20,8 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.ProcessorsRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -85,12 +85,12 @@ public final class Pipeline {
public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
public Pipeline create(String id, Map<String, Object> config, ProcessorsRegistry processorRegistry) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = readProcessorConfigs(processorConfigs, processorRegistry);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
@ -98,35 +98,5 @@ public final class Pipeline {
return new Pipeline(id, description, compoundProcessor);
}
private List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs, Map<String, Processor.Factory> processorRegistry) throws Exception {
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue()));
}
}
}
return processors;
}
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorRegistry.get(type);
if (factory != null) {
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
Processor processor;
processor = factory.create(config);
if (!config.isEmpty()) {
throw new ElasticsearchParseException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessors.isEmpty()) {
return processor;
}
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
}
throw new ElasticsearchParseException("No processor type exists with name [" + type + "]");
}
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.core.AbstractProcessor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readList;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty;
/**
* A processor that for each value in a list executes a one or more processors.
*
* This can be useful in cases to do string operations on json array of strings,
* or remove a field from objects inside a json array.
*/
public final class ForEachProcessor extends AbstractProcessor {
public static final String TYPE = "foreach";
private final String field;
private final List<Processor> processors;
ForEachProcessor(String tag, String field, List<Processor> processors) {
super(tag);
this.field = field;
this.processors = processors;
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
List<Object> values = ingestDocument.getFieldValue(field, List.class);
List<Object> newValues = new ArrayList<>(values.size());
for (Object value : values) {
Map<String, Object> innerSource = new HashMap<>();
innerSource.put("_value", value);
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
innerSource.put(metaData.getFieldName(), ingestDocument.getSourceAndMetadata().get(metaData.getFieldName()));
}
IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata());
for (Processor processor : processors) {
processor.execute(innerIngestDocument);
}
newValues.add(innerSource.get("_value"));
}
ingestDocument.setFieldValue(field, newValues);
}
@Override
public String getType() {
return TYPE;
}
String getField() {
return field;
}
List<Processor> getProcessors() {
return processors;
}
public static final class Factory extends AbstractProcessorFactory<ForEachProcessor> {
private final ProcessorsRegistry processorRegistry;
public Factory(ProcessorsRegistry processorRegistry) {
this.processorRegistry = processorRegistry;
}
@Override
protected ForEachProcessor doCreate(String tag, Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
List<Map<String, Map<String, Object>>> processorConfigs = readList(TYPE, tag, config, "processors");
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry);
return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors));
}
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.ForEachProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
@ -41,7 +42,7 @@ import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService;
import java.util.function.Function;
import java.util.function.BiFunction;
/**
*
@ -50,7 +51,7 @@ public class NodeModule extends AbstractModule {
private final Node node;
private final MonitorService monitorService;
private final ProcessorsRegistry processorsRegistry;
private final ProcessorsRegistry.Builder processorsRegistryBuilder;
// pkg private so tests can mock
Class<? extends PageCacheRecycler> pageCacheRecyclerImpl = PageCacheRecycler.class;
@ -59,21 +60,22 @@ public class NodeModule extends AbstractModule {
public NodeModule(Node node, MonitorService monitorService) {
this.node = node;
this.monitorService = monitorService;
this.processorsRegistry = new ProcessorsRegistry();
this.processorsRegistryBuilder = new ProcessorsRegistry.Builder();
registerProcessor(DateProcessor.TYPE, (templateService) -> new DateProcessor.Factory());
registerProcessor(SetProcessor.TYPE, SetProcessor.Factory::new);
registerProcessor(AppendProcessor.TYPE, AppendProcessor.Factory::new);
registerProcessor(RenameProcessor.TYPE, (templateService) -> new RenameProcessor.Factory());
registerProcessor(RemoveProcessor.TYPE, RemoveProcessor.Factory::new);
registerProcessor(SplitProcessor.TYPE, (templateService) -> new SplitProcessor.Factory());
registerProcessor(JoinProcessor.TYPE, (templateService) -> new JoinProcessor.Factory());
registerProcessor(UppercaseProcessor.TYPE, (templateService) -> new UppercaseProcessor.Factory());
registerProcessor(LowercaseProcessor.TYPE, (templateService) -> new LowercaseProcessor.Factory());
registerProcessor(TrimProcessor.TYPE, (templateService) -> new TrimProcessor.Factory());
registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory());
registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new);
registerProcessor(DateProcessor.TYPE, (templateService, registry) -> new DateProcessor.Factory());
registerProcessor(SetProcessor.TYPE, (templateService, registry) -> new SetProcessor.Factory(templateService));
registerProcessor(AppendProcessor.TYPE, (templateService, registry) -> new AppendProcessor.Factory(templateService));
registerProcessor(RenameProcessor.TYPE, (templateService, registry) -> new RenameProcessor.Factory());
registerProcessor(RemoveProcessor.TYPE, (templateService, registry) -> new RemoveProcessor.Factory(templateService));
registerProcessor(SplitProcessor.TYPE, (templateService, registry) -> new SplitProcessor.Factory());
registerProcessor(JoinProcessor.TYPE, (templateService, registry) -> new JoinProcessor.Factory());
registerProcessor(UppercaseProcessor.TYPE, (templateService, registry) -> new UppercaseProcessor.Factory());
registerProcessor(LowercaseProcessor.TYPE, (templateService, registry) -> new LowercaseProcessor.Factory());
registerProcessor(TrimProcessor.TYPE, (templateService, registry) -> new TrimProcessor.Factory());
registerProcessor(ConvertProcessor.TYPE, (templateService, registry) -> new ConvertProcessor.Factory());
registerProcessor(GsubProcessor.TYPE, (templateService, registry) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, (templateService, registry) -> new FailProcessor.Factory(templateService));
registerProcessor(ForEachProcessor.TYPE, (templateService, registry) -> new ForEachProcessor.Factory(registry));
}
@Override
@ -92,7 +94,7 @@ public class NodeModule extends AbstractModule {
bind(Node.class).toInstance(node);
bind(MonitorService.class).toInstance(monitorService);
bind(NodeService.class).asEagerSingleton();
bind(ProcessorsRegistry.class).toInstance(processorsRegistry);
bind(ProcessorsRegistry.Builder.class).toInstance(processorsRegistryBuilder);
}
/**
@ -105,7 +107,7 @@ public class NodeModule extends AbstractModule {
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, Function<TemplateService, Processor.Factory<?>> processorFactoryProvider) {
processorsRegistry.registerProcessor(type, processorFactoryProvider);
public void registerProcessor(String type, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> provider) {
processorsRegistryBuilder.registerProcessor(type, provider);
}
}

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -78,7 +77,7 @@ public class NodeService extends AbstractComponent implements Closeable {
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService,
Discovery discovery, TransportService transportService, IndicesService indicesService,
PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version,
ProcessorsRegistry processorsRegistry, ClusterService clusterService, SettingsFilter settingsFilter) {
ProcessorsRegistry.Builder processorsRegistryBuilder, ClusterService clusterService, SettingsFilter settingsFilter) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
@ -89,7 +88,7 @@ public class NodeService extends AbstractComponent implements Closeable {
this.version = version;
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
this.ingestService = new IngestService(settings, threadPool, processorsRegistry);
this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder);
this.settingsFilter = settingsFilter;
clusterService.add(ingestService.getPipelineStore());
}

View File

@ -20,7 +20,9 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
@ -54,11 +56,12 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mock_processor", mock(Processor.Factory.class));
ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder();
processorRegistryBuilder.registerProcessor("mock_processor", ((templateService, registry) -> mock(Processor.Factory.class)));
ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(TestTemplateService.instance());
store = mock(PipelineStore.class);
when(store.get(SimulatePipelineRequest.SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry);
when(store.getProcessorRegistry()).thenReturn(processorRegistry);
}
public void testParseUsingPipelineStore() throws Exception {

View File

@ -244,7 +244,7 @@ public class IngestClientIT extends ESIntegTestCase {
}
public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor("test", templateService -> config ->
nodeModule.registerProcessor("test", (templateService, registry) -> config ->
new TestProcessor("id", "test", ingestDocument -> {
ingestDocument.setFieldValue("processed", true);
if (ingestDocument.getFieldValue("fail", Boolean.class)) {

View File

@ -50,9 +50,9 @@ public class PipelineStoreTests extends ESTestCase {
@Before
public void init() throws Exception {
store = new PipelineStore(Settings.EMPTY);
ProcessorsRegistry registry = new ProcessorsRegistry();
registry.registerProcessor("set", (templateService) -> new SetProcessor.Factory(TestTemplateService.instance()));
store.buildProcessorFactoryRegistry(registry, null);
ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder();
registryBuilder.registerProcessor("set", (templateService, registry) -> new SetProcessor.Factory(TestTemplateService.instance()));
store.buildProcessorFactoryRegistry(registryBuilder, null);
}
public void testUpdatePipelines() {

View File

@ -19,42 +19,30 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.test.ESTestCase;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;
public class ProcessorsRegistryTests extends ESTestCase {
public void testAddProcessor() {
ProcessorsRegistry processorsRegistry = new ProcessorsRegistry();
public void testBuildProcessorRegistry() {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
TestProcessor.Factory factory1 = new TestProcessor.Factory();
processorsRegistry.registerProcessor("1", (templateService) -> factory1);
builder.registerProcessor("1", (templateService, registry) -> factory1);
TestProcessor.Factory factory2 = new TestProcessor.Factory();
processorsRegistry.registerProcessor("2", (templateService) -> factory2);
builder.registerProcessor("2", (templateService, registry) -> factory2);
TestProcessor.Factory factory3 = new TestProcessor.Factory();
try {
processorsRegistry.registerProcessor("1", (templateService) -> factory3);
builder.registerProcessor("1", (templateService, registry) -> factory3);
fail("addProcessor should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]"));
}
Set<Map.Entry<String, Function<TemplateService, Processor.Factory<?>>>> entrySet = processorsRegistry.entrySet();
assertThat(entrySet.size(), equalTo(2));
for (Map.Entry<String, Function<TemplateService, Processor.Factory<?>>> entry : entrySet) {
if (entry.getKey().equals("1")) {
assertThat(entry.getValue().apply(null), equalTo(factory1));
} else if (entry.getKey().equals("2")) {
assertThat(entry.getValue().apply(null), equalTo(factory2));
} else {
fail("unexpected processor id [" + entry.getKey() + "]");
}
}
ProcessorsRegistry registry = builder.build(TestTemplateService.instance());
assertThat(registry.getProcessorFactories().size(), equalTo(2));
assertThat(registry.getProcessorFactory("1"), sameInstance(factory1));
assertThat(registry.getProcessorFactory("2"), sameInstance(factory2));
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -31,6 +33,9 @@ import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
public class ConfigurationUtilsTests extends ESTestCase {
@ -68,4 +73,31 @@ public class ConfigurationUtilsTests extends ESTestCase {
List<String> val = ConfigurationUtils.readList(null, null, config, "int");
assertThat(val, equalTo(Collections.singletonList(2)));
}
public void testReadProcessors() throws Exception {
Processor processor = mock(Processor.class);
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
builder.registerProcessor("test_processor", (templateService, registry) -> config -> processor);
ProcessorsRegistry registry = builder.build(TestTemplateService.instance());
List<Map<String, Map<String, Object>>> config = new ArrayList<>();
Map<String, Object> emptyConfig = Collections.emptyMap();
config.add(Collections.singletonMap("test_processor", emptyConfig));
config.add(Collections.singletonMap("test_processor", emptyConfig));
List<Processor> result = ConfigurationUtils.readProcessorConfigs(config, registry);
assertThat(result.size(), equalTo(2));
assertThat(result.get(0), sameInstance(processor));
assertThat(result.get(1), sameInstance(processor));
config.add(Collections.singletonMap("unknown_processor", emptyConfig));
try {
ConfigurationUtils.readProcessorConfigs(config, registry);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]"));
}
}
}

View File

@ -20,13 +20,16 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.prefs.PreferencesFactory;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@ -41,7 +44,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -57,7 +60,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
Pipeline.Factory factory = new Pipeline.Factory();
try {
factory.create("_id", pipelineConfig, Collections.emptyMap());
factory.create("_id", pipelineConfig, createProcessorRegistry(Collections.emptyMap()));
fail("should fail, missing required [processors] field");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
@ -71,7 +74,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -88,7 +91,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (ElasticsearchParseException e) {
@ -104,11 +107,19 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound"));
}
private ProcessorsRegistry createProcessorRegistry(Map<String, Processor.Factory> processorRegistry) {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
for (Map.Entry<String, Processor.Factory> entry : processorRegistry.entrySet()) {
builder.registerProcessor(entry.getKey(), ((templateService, registry) -> entry.getValue()));
}
return builder.build(TestTemplateService.instance());
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ForEachProcessorFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
Processor processor = new TestProcessor(ingestDocument -> {});
builder.registerProcessor("_name", (templateService, registry) -> config -> processor);
ProcessorsRegistry registry = builder.build(TestTemplateService.instance());
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(registry);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
ForEachProcessor forEachProcessor = forEachFactory.create(config);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), Matchers.equalTo("_field"));
assertThat(forEachProcessor.getProcessors().size(), Matchers.equalTo(1));
assertThat(forEachProcessor.getProcessors().get(0), Matchers.sameInstance(processor));
config = new HashMap<>();
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
try {
forEachFactory.create(config);
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
}
config = new HashMap<>();
config.put("field", "_field");
try {
forEachFactory.create(config);
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), Matchers.equalTo("[processors] required property is missing"));
}
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class ForEachProcessorTests extends ESTestCase {
public void testExecute() throws Exception {
List<String> values = new ArrayList<>();
values.add("foo");
values.add("bar");
values.add("baz");
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", Collections.singletonList(new UppercaseProcessor("_tag", "_value"))
);
processor.execute(ingestDocument);
List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.get(0), equalTo("FOO"));
assertThat(result.get(1), equalTo("BAR"));
assertThat(result.get(2), equalTo("BAZ"));
}
public void testExecuteWithFailure() throws Exception {
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c"))
);
TestProcessor testProcessor = new TestProcessor(id -> {
if ("c".equals(id.getFieldValue("_value", String.class))) {
throw new RuntimeException("failure");
}
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(testProcessor));
try {
processor.execute(ingestDocument);
fail("exception expected");
} catch (RuntimeException e) {
assertThat(e.getMessage(), equalTo("failure"));
}
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c")));
testProcessor = new TestProcessor(id -> {
String value = id.getFieldValue("_value", String.class);
if ("c".equals(value)) {
throw new RuntimeException("failure");
} else {
id.setFieldValue("_value", value.toUpperCase(Locale.ROOT));
}
});
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
processor = new ForEachProcessor(
"_tag", "values",
Collections.singletonList(new CompoundProcessor(Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)))
);
processor.execute(ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c")));
}
public void testMetaDataAvailable() throws Exception {
List<Map<String, Object>> values = new ArrayList<>();
values.add(new HashMap<>());
values.add(new HashMap<>());
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
TestProcessor innerProcessor = new TestProcessor(id -> {
id.setFieldValue("_value.index", id.getSourceAndMetadata().get("_index"));
id.setFieldValue("_value.type", id.getSourceAndMetadata().get("_type"));
id.setFieldValue("_value.id", id.getSourceAndMetadata().get("_id"));
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(innerProcessor));
processor.execute(ingestDocument);
assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index"));
assertThat(ingestDocument.getFieldValue("values.0.type", String.class), equalTo("_type"));
assertThat(ingestDocument.getFieldValue("values.0.id", String.class), equalTo("_id"));
assertThat(ingestDocument.getFieldValue("values.1.index", String.class), equalTo("_index"));
assertThat(ingestDocument.getFieldValue("values.1.type", String.class), equalTo("_type"));
assertThat(ingestDocument.getFieldValue("values.1.id", String.class), equalTo("_id"));
}
public void testRandom() throws Exception {
int numProcessors = randomInt(8);
List<Processor> processors = new ArrayList<>(numProcessors);
for (int i = 0; i < numProcessors; i++) {
processors.add(new Processor() {
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
String existingValue = ingestDocument.getFieldValue("_value", String.class);
ingestDocument.setFieldValue("_value", existingValue + ".");
}
@Override
public String getType() {
return null;
}
@Override
public String getTag() {
return null;
}
});
}
int numValues = randomIntBetween(1, 32);
List<String> values = new ArrayList<>(numValues);
for (int i = 0; i < numValues; i++) {
values.add("");
}
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", processors);
processor.execute(ingestDocument);
List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.size(), equalTo(numValues));
String expectedString = "";
for (int i = 0; i < numProcessors; i++) {
expectedString = expectedString + ".";
}
for (String r : result) {
assertThat(r, equalTo(expectedString));
}
}
}

View File

@ -534,6 +534,151 @@ to the requester.
}
--------------------------------------------------
==== Foreach processor
All processors can operate on elements inside an array, but if all elements of an array need to
be processed in the same way defining a processor for each element becomes cumbersome and tricky
because it is likely that the number of elements in an array are unknown. For this reason the `foreach`
processor is exists. By specifying the field holding array elements and a list of processors that
define what should happen to each element, array field can easily be preprocessed.
Processors inside the foreach processor work in a different context and the only valid top level
field is `_value`, which holds the array element value. Under this field other fields may exist.
If the `foreach` processor failed to process an element inside the array and no `on_failure` processor has been specified
then it aborts the execution and leaves the array unmodified.
[[foreach-options]]
.Foreach Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The array field
| `processors` | yes | - | The processors
|======
Assume the following document:
[source,js]
--------------------------------------------------
{
"value" : ["foo", "bar", "baz"]
}
--------------------------------------------------
When this `foreach` processor operates on this sample document:
[source,js]
--------------------------------------------------
{
"foreach" : {
"field" : "values",
"processors" : [
{
"uppercase" : {
"field" : "_value"
}
}
]
}
}
--------------------------------------------------
Then the document will look like this after preprocessing:
[source,js]
--------------------------------------------------
{
"value" : ["FOO", "BAR", "BAZ"]
}
--------------------------------------------------
Lets take a look at another example:
[source,js]
--------------------------------------------------
{
"persons" : [
{
"id" : "1",
"name" : "John Doe"
},
{
"id" : "2",
"name" : "Jane Doe"
}
]
}
--------------------------------------------------
and in the case the `id` field needs to be removed
then the following `foreach` processor can be used:
[source,js]
--------------------------------------------------
{
"foreach" : {
"field" : "persons",
"processors" : [
{
"remove" : {
"field" : "_value.id"
}
}
]
}
}
--------------------------------------------------
After preprocessing the result is:
[source,js]
--------------------------------------------------
{
"persons" : [
{
"name" : "John Doe"
},
{
"name" : "Jane Doe"
}
]
}
--------------------------------------------------
Like on any processor `on_failure` processors can also be defined
in processors that wrapped inside the `foreach` processor.
For example the `id` field may not exist on all person objects and
instead of failing the index request, the document will be send to
the 'failure_index' index for later inspection:
[source,js]
--------------------------------------------------
{
"foreach" : {
"field" : "persons",
"processors" : [
{
"remove" : {
"field" : "_value.id",
"on_failure" : [
{
"set" : {
"field", "_index",
"value", "failure_index"
}
}
]
}
}
]
}
}
--------------------------------------------------
In this example if the `remove` processor does fail then
the array elements that have been processed thus far will
be updated.
=== Accessing data in pipelines

View File

@ -56,7 +56,7 @@ public class IngestGrokPlugin extends Plugin {
}
public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService) -> new GrokProcessor.Factory(builtinPatterns));
nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService, registry) -> new GrokProcessor.Factory(builtinPatterns));
}
public static Map<String, String> loadBuiltinPatterns() throws IOException {

View File

@ -50,7 +50,7 @@ public class IngestGeoIpPlugin extends Plugin {
public void onModule(NodeModule nodeModule) throws IOException {
Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip");
Map<String, DatabaseReader> databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService) -> new GeoIpProcessor.Factory(databaseReaders));
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService, registry) -> new GeoIpProcessor.Factory(databaseReaders));
}
public static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {

View File

@ -36,6 +36,7 @@ import org.elasticsearch.ingest.grok.IngestGrokPlugin;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.ForEachProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
@ -49,6 +50,7 @@ import java.io.ByteArrayInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -155,10 +157,17 @@ public class CombineProcessorsTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testMutate() throws Exception {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
builder.registerProcessor("remove", (templateService, registry) -> new RemoveProcessor.Factory(templateService));
builder.registerProcessor("trim", (templateService, registry) -> new TrimProcessor.Factory());
ProcessorsRegistry registry = builder.build(TestTemplateService.instance());
Map<String, Object> config = new HashMap<>();
// TODO: when we add foreach processor we should delete all friends.id fields
config.put("field", "friends.0.id");
RemoveProcessor processor1 = new RemoveProcessor.Factory(TestTemplateService.instance()).create(config);
config.put("field", "friends");
Map<String, Object> removeConfig = new HashMap<>();
removeConfig.put("field", "_value.id");
config.put("processors", Collections.singletonList(Collections.singletonMap("remove", removeConfig)));
ForEachProcessor processor1 = new ForEachProcessor.Factory(registry).create(config);
config = new HashMap<>();
config.put("field", "tags");
config.put("value", "new_value");
@ -168,9 +177,11 @@ public class CombineProcessorsTests extends ESTestCase {
config.put("separator", ",");
SplitProcessor processor3 = new SplitProcessor.Factory().create(config);
config = new HashMap<>();
// TODO: when we add foreach processor, then change the test to trim all address values
config.put("field", "address.1");
TrimProcessor processor4 = new TrimProcessor.Factory().create(config);
config.put("field", "address");
Map<String, Object> trimConfig = new HashMap<>();
trimConfig.put("field", "_value");
config.put("processors", Collections.singletonList(Collections.singletonMap("trim", trimConfig)));
ForEachProcessor processor4 = new ForEachProcessor.Factory(registry).create(config);
config = new HashMap<>();
config.put("field", "company");
LowercaseProcessor processor5 = new LowercaseProcessor.Factory().create(config);
@ -190,16 +201,16 @@ public class CombineProcessorsTests extends ESTestCase {
pipeline.execute(document);
assertThat(((List<Map<String, Object>>) document.getSourceAndMetadata().get("friends")).get(0).get("id"), nullValue());
assertThat(((List<Map<String, Object>>) document.getSourceAndMetadata().get("friends")).get(1).get("id"), equalTo(1));
assertThat(((List<Map<String, Object>>) document.getSourceAndMetadata().get("friends")).get(2).get("id"), equalTo(2));
assertThat(((List<Map<String, Object>>) document.getSourceAndMetadata().get("friends")).get(1).get("id"), nullValue());
assertThat(((List<Map<String, Object>>) document.getSourceAndMetadata().get("friends")).get(2).get("id"), nullValue());
assertThat(document.getFieldValue("tags.7", String.class), equalTo("new_value"));
List<String> addressDetails = document.getFieldValue("address", List.class);
assertThat(addressDetails.size(), equalTo(4));
assertThat(addressDetails.get(0), equalTo("713 Bartlett Place"));
assertThat(addressDetails.get(1), equalTo("Accoville"));
assertThat(addressDetails.get(2), equalTo(" Puerto Rico"));
assertThat(addressDetails.get(3), equalTo(" 9221"));
assertThat(addressDetails.get(2), equalTo("Puerto Rico"));
assertThat(addressDetails.get(3), equalTo("9221"));
assertThat(document.getSourceAndMetadata().get("company"), equalTo("atgen"));
assertThat(document.getSourceAndMetadata().get("gender"), equalTo("MALE"));

View File

@ -0,0 +1,41 @@
---
"Test foreach Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"foreach" : {
"field" : "values",
"processors" : [
{
"uppercase" : {
"field" : "_value"
}
}
]
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {
values: ["foo", "bar", "baz"]
}
- do:
get:
index: test
type: test
id: 1
- match: { _source.values: ["FOO", "BAR", "BAZ"] }