Merge pull request #19226 from rjernst/ingest_plugin_api

Add IngestPlugin api for plugins to add ingest processors
This commit is contained in:
Ryan Ernst 2016-07-05 21:04:44 -07:00 committed by GitHub
commit 52ef3666e0
63 changed files with 471 additions and 510 deletions

View File

@ -141,7 +141,7 @@ public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineReque
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorRegistry());
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

View File

@ -39,8 +39,8 @@ public final class ConfigurationUtils {
*
* If the property value isn't of type string a {@link ElasticsearchParseException} is thrown.
*/
public static String readOptionalStringProperty(String processorType, String processorTag, Map<String, Object> configuration,
String propertyName) {
public static String readOptionalStringProperty(String processorType, String processorTag,
Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
return readString(processorType, processorTag, propertyName, value);
}
@ -112,8 +112,8 @@ public final class ConfigurationUtils {
* If the property value isn't of type int a {@link ElasticsearchParseException} is thrown.
* If the property is missing an {@link ElasticsearchParseException} is thrown
*/
public static int readIntProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName,
int defaultValue) {
public static int readIntProperty(String processorType, String processorTag, Map<String, Object> configuration,
String propertyName, int defaultValue) {
Object value = configuration.remove(propertyName);
if (value == null) {
return defaultValue;
@ -146,7 +146,8 @@ public final class ConfigurationUtils {
* If the property value isn't of type list an {@link ElasticsearchParseException} is thrown.
* If the property is missing an {@link ElasticsearchParseException} is thrown
*/
public static <T> List<T> readList(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
public static <T> List<T> readList(String processorType, String processorTag, Map<String, Object> configuration,
String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw newConfigurationException(processorType, processorTag, propertyName, "required property is missing");
@ -211,7 +212,8 @@ public final class ConfigurationUtils {
/**
* Returns and removes the specified property as an {@link Object} from the specified configuration map.
*/
public static Object readObject(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
public static Object readObject(String processorType, String processorTag, Map<String, Object> configuration,
String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw newConfigurationException(processorType, processorTag, propertyName, "required property is missing");
@ -219,8 +221,8 @@ public final class ConfigurationUtils {
return value;
}
public static ElasticsearchParseException newConfigurationException(String processorType, String processorTag, String propertyName,
String reason) {
public static ElasticsearchParseException newConfigurationException(String processorType, String processorTag,
String propertyName, String reason) {
ElasticsearchParseException exception = new ElasticsearchParseException("[" + propertyName + "] " + reason);
if (processorType != null) {
@ -236,12 +238,12 @@ public final class ConfigurationUtils {
}
public static List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs,
ProcessorsRegistry processorRegistry) throws Exception {
Map<String, Processor.Factory> processorFactories) throws Exception {
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue()));
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
}
}
}
@ -249,16 +251,17 @@ public final class ConfigurationUtils {
return processors;
}
private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorRegistry.getProcessorFactory(type);
private static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorFactories.get(type);
if (factory != null) {
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
Processor processor = factory.create(tag, config);
Processor processor = factory.create(processorFactories, tag, config);
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(processor.getType(), processor.getTag(), Pipeline.ON_FAILURE_KEY,

View File

@ -19,16 +19,18 @@
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
/**
* Holder class for several ingest related services.
@ -37,11 +39,20 @@ public class IngestService {
private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;
private final ProcessorsRegistry.Builder processorsRegistryBuilder;
public IngestService(Settings settings, ThreadPool threadPool, ProcessorsRegistry.Builder processorsRegistryBuilder) {
this.processorsRegistryBuilder = processorsRegistryBuilder;
this.pipelineStore = new PipelineStore(settings);
public IngestService(Settings settings, ThreadPool threadPool,
Environment env, ScriptService scriptService, List<IngestPlugin> ingestPlugins) {
final TemplateService templateService = new InternalTemplateService(scriptService);
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (IngestPlugin ingestPlugin : ingestPlugins) {
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(env, scriptService, templateService);
for (Map.Entry<String, Processor.Factory> entry : newProcessors.entrySet()) {
if (processorFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered");
}
}
}
this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories));
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
}
@ -53,12 +64,8 @@ public class IngestService {
return pipelineExecutionService;
}
public void buildProcessorsFactoryRegistry(ScriptService scriptService, ClusterService clusterService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService, clusterService);
}
public IngestInfo info() {
Map<String, Processor.Factory> processorFactories = pipelineStore.getProcessorRegistry().getProcessorFactories();
Map<String, Processor.Factory> processorFactories = pipelineStore.getProcessorFactories();
List<ProcessorInfo> processorInfoList = new ArrayList<>(processorFactories.size());
for (Map.Entry<String, Processor.Factory> entry : processorFactories.entrySet()) {
processorInfoList.add(new ProcessorInfo(entry.getKey()));

View File

@ -98,13 +98,13 @@ public final class Pipeline {
public static final class Factory {
public Pipeline create(String id, Map<String, Object> config, ProcessorsRegistry processorRegistry) throws Exception {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorFactories) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("pipeline [" + id +
"] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));

View File

@ -19,6 +19,12 @@
package org.elasticsearch.ingest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
@ -37,20 +43,11 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ScriptService;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PipelineStore extends AbstractComponent implements ClusterStateListener {
private final Pipeline.Factory factory = new Pipeline.Factory();
private ProcessorsRegistry processorRegistry;
private final Map<String, Processor.Factory> processorFactories;
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
@ -58,13 +55,9 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
volatile Map<String, Pipeline> pipelines = new HashMap<>();
public PipelineStore(Settings settings) {
public PipelineStore(Settings settings, Map<String, Processor.Factory> processorFactories) {
super(settings);
}
public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService,
ClusterService clusterService) {
this.processorRegistry = processorsRegistryBuilder.build(scriptService, clusterService);
this.processorFactories = processorFactories;
}
@Override
@ -81,7 +74,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
Map<String, Pipeline> pipelines = new HashMap<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorRegistry));
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
} catch (ElasticsearchParseException e) {
throw e;
} catch (Exception e) {
@ -157,7 +150,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
}
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorRegistry);
Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories);
List<IllegalArgumentException> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
@ -194,8 +187,8 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
return pipelines.get(id);
}
public ProcessorsRegistry getProcessorRegistry() {
return processorRegistry;
public Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}
/**

View File

@ -50,12 +50,14 @@ public interface Processor {
/**
* Creates a processor based on the specified map of maps config.
*
* @param processorFactories Other processors which may be created inside this processor
* @param tag The tag for the processor
* @param config Configuration for the processor to create
* @param config The configuration for the processor
*
* Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can
* verify if all configurations settings have been used.
* <b>Note:</b> Implementations are responsible for removing the used configuration keys, so that after
* creating a pipeline ingest can verify if all configurations settings have been used.
*/
Processor create(String tag, Map<String, Object> config) throws Exception;
Processor create(Map<String, Processor.Factory> processorFactories, String tag,
Map<String, Object> config) throws Exception;
}
}

View File

@ -1,91 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
public final class ProcessorsRegistry {
private final Map<String, Processor.Factory> processorFactories;
private final TemplateService templateService;
private final ScriptService scriptService;
private final ClusterService clusterService;
private ProcessorsRegistry(ScriptService scriptService, ClusterService clusterService,
Map<String, Function<ProcessorsRegistry, Processor.Factory>> providers) {
this.templateService = new InternalTemplateService(scriptService);
this.scriptService = scriptService;
this.clusterService = clusterService;
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (Map.Entry<String, Function<ProcessorsRegistry, Processor.Factory>> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(this));
}
this.processorFactories = Collections.unmodifiableMap(processorFactories);
}
public TemplateService getTemplateService() {
return templateService;
}
public ScriptService getScriptService() {
return scriptService;
}
public ClusterService getClusterService() {
return clusterService;
}
public Processor.Factory getProcessorFactory(String name) {
return processorFactories.get(name);
}
// For testing:
Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}
public static final class Builder {
private final Map<String, Function<ProcessorsRegistry, Processor.Factory>> providers = new HashMap<>();
/**
* Adds a processor factory under a specific name.
*/
public void registerProcessor(String name, Function<ProcessorsRegistry, Processor.Factory> provider) {
Function<ProcessorsRegistry, Processor.Factory> previous = this.providers.putIfAbsent(name, provider);
if (previous != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
}
}
public ProcessorsRegistry build(ScriptService scriptService, ClusterService clusterService) {
return new ProcessorsRegistry(scriptService, clusterService, providers);
}
}
}

View File

@ -86,12 +86,14 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
@ -265,6 +267,9 @@ public class Node implements Closeable {
final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId());
resourcesToClose.add(tribeService);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
final IngestService ingestService = new IngestService(settings, threadPool, environment,
scriptModule.getScriptService(), pluginsService.filterPlugins(IngestPlugin.class));
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
@ -304,6 +309,7 @@ public class Node implements Closeable {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
}
);
injector = modules.createInjector();

View File

@ -20,28 +20,17 @@
package org.elasticsearch.node;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService;
import java.util.function.Function;
/**
*
*/
public class NodeModule extends AbstractModule {
private final Node node;
private final MonitorService monitorService;
private final ProcessorsRegistry.Builder processorsRegistryBuilder;
public NodeModule(Node node, MonitorService monitorService) {
this.node = node;
this.monitorService = monitorService;
this.processorsRegistryBuilder = new ProcessorsRegistry.Builder();
}
@Override
@ -49,20 +38,5 @@ public class NodeModule extends AbstractModule {
bind(Node.class).toInstance(node);
bind(MonitorService.class).toInstance(monitorService);
bind(NodeService.class).asEagerSingleton();
bind(ProcessorsRegistry.Builder.class).toInstance(processorsRegistryBuilder);
}
/**
* Returns the node
*/
public Node getNode() {
return node;
}
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, Function<ProcessorsRegistry, Processor.Factory> provider) {
processorsRegistryBuilder.registerProcessor(type, provider);
}
}

View File

@ -38,7 +38,6 @@ import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.ScriptService;
@ -57,7 +56,6 @@ public class NodeService extends AbstractComponent implements Closeable {
private final CircuitBreakerService circuitBreakerService;
private final IngestService ingestService;
private final SettingsFilter settingsFilter;
private ClusterService clusterService;
private ScriptService scriptService;
@Nullable
@ -68,9 +66,8 @@ public class NodeService extends AbstractComponent implements Closeable {
@Inject
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
TransportService transportService, IndicesService indicesService, PluginsService pluginService,
CircuitBreakerService circuitBreakerService, @Nullable HttpServer httpServer,
ProcessorsRegistry.Builder processorsRegistryBuilder, ClusterService clusterService,
SettingsFilter settingsFilter) {
CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServer httpServer,
IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
@ -80,20 +77,13 @@ public class NodeService extends AbstractComponent implements Closeable {
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
this.httpServer = httpServer;
this.clusterService = clusterService;
this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder);
this.ingestService = ingestService;
this.settingsFilter = settingsFilter;
this.scriptService = scriptService;
clusterService.add(ingestService.getPipelineStore());
clusterService.add(ingestService.getPipelineExecutionService());
}
// can not use constructor injection or there will be a circular dependency
@Inject(optional = true)
public void setScriptService(ScriptService scriptService) {
this.scriptService = scriptService;
this.ingestService.buildProcessorsFactoryRegistry(scriptService, clusterService);
}
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) {
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(),

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugins;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.script.ScriptService;
/**
* An extension point for {@link Plugin} implementations to add custom ingest processors
*/
public interface IngestPlugin {
/**
* Returns additional ingest processor types added by this plugin.
*
* The key of the returned {@link Map} is the unique name for the processor which is specified
* in pipeline configurations, and the value is a {@link org.elasticsearch.ingest.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
return Collections.emptyMap();
}
}

View File

@ -19,19 +19,6 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -40,6 +27,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields;
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID;
import static org.elasticsearch.ingest.IngestDocument.MetaData.ID;
@ -59,12 +55,11 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder();
processorRegistryBuilder.registerProcessor("mock_processor", ((registry) -> mock(Processor.Factory.class)));
ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(mock(ScriptService.class), mock(ClusterService.class));
Map<String, Processor.Factory> registry =
Collections.singletonMap("mock_processor", (factories, tag, config) -> processor);
store = mock(PipelineStore.class);
when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.getProcessorRegistry()).thenReturn(processorRegistry);
when(store.getProcessorFactories()).thenReturn(registry);
}
public void testParseUsingPipelineStore() throws Exception {

View File

@ -19,12 +19,6 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -32,11 +26,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
public class ConfigurationUtilsTests extends ESTestCase {
private Map<String, Object> config;
@ -95,10 +92,8 @@ public class ConfigurationUtilsTests extends ESTestCase {
public void testReadProcessors() throws Exception {
Processor processor = mock(Processor.class);
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
builder.registerProcessor("test_processor", (registry) -> (tag, config) -> processor);
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));
Map<String, Processor.Factory> registry =
Collections.singletonMap("test_processor", (factories, tag, config) -> processor);
List<Map<String, Map<String, Object>>> config = new ArrayList<>();
Map<String, Object> emptyConfig = Collections.emptyMap();

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
public class IngestServiceTests extends ESTestCase {
private final IngestPlugin DUMMY_PLUGIN = new IngestPlugin() {
@Override
public Map<String, Processor.Factory> getProcessors(Environment env, ScriptService scriptService, TemplateService templateService) {
return Collections.singletonMap("foo", (factories, tag, config) -> null);
}
};
public void testIngestPlugin() {
IngestService ingestService = new IngestService(Settings.EMPTY, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
assertTrue(factories.containsKey("foo"));
assertEquals(1, factories.size());
}
public void testIngestPluginDuplicate() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new IngestService(Settings.EMPTY, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
);
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
}
}

View File

@ -47,7 +47,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -63,7 +63,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
Pipeline.Factory factory = new Pipeline.Factory();
try {
factory.create("_id", pipelineConfig, createProcessorRegistry(Collections.emptyMap()));
factory.create("_id", pipelineConfig, Collections.emptyMap());
fail("should fail, missing required [processors] field");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
@ -88,7 +88,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -105,7 +105,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined"));
}
@ -117,7 +117,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty"));
}
@ -126,7 +126,7 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("ignore_failure", true);
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
@ -151,7 +151,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
@ -164,7 +164,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -181,12 +181,4 @@ public class PipelineFactoryTests extends ESTestCase {
List<Processor> flattened = pipeline.flattenAllProcessors();
assertThat(flattened.size(), equalTo(4));
}
private ProcessorsRegistry createProcessorRegistry(Map<String, Processor.Factory> processorRegistry) {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
for (Map.Entry<String, Processor.Factory> entry : processorRegistry.entrySet()) {
builder.registerProcessor(entry.getKey(), ((registry) -> entry.getValue()));
}
return builder.build(mock(ScriptService.class), mock(ClusterService.class));
}
}

View File

@ -56,9 +56,8 @@ public class PipelineStoreTests extends ESTestCase {
@Before
public void init() throws Exception {
store = new PipelineStore(Settings.EMPTY);
ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder();
registryBuilder.registerProcessor("set", (registry) -> (tag, config) -> {
Map<String, Processor.Factory> processorFactories = new HashMap<>();
processorFactories.put("set", (factories, tag, config) -> {
String field = (String) config.remove("field");
String value = (String) config.remove("value");
return new Processor() {
@ -78,7 +77,7 @@ public class PipelineStoreTests extends ESTestCase {
}
};
});
registryBuilder.registerProcessor("remove", (registry) -> (tag, config) -> {
processorFactories.put("remove", (factories, tag, config) -> {
String field = (String) config.remove("field");
return new Processor() {
@Override
@ -97,7 +96,7 @@ public class PipelineStoreTests extends ESTestCase {
}
};
});
store.buildProcessorFactoryRegistry(registryBuilder, mock(ScriptService.class), mock(ClusterService.class));
store = new PipelineStore(Settings.EMPTY, processorFactories);
}
public void testUpdatePipelines() {

View File

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.mockito.Mockito.mock;
public class ProcessorsRegistryTests extends ESTestCase {
public void testBuildProcessorRegistry() {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
TestProcessor.Factory factory1 = new TestProcessor.Factory();
builder.registerProcessor("1", (registry) -> factory1);
TestProcessor.Factory factory2 = new TestProcessor.Factory();
builder.registerProcessor("2", (registry) -> factory2);
TestProcessor.Factory factory3 = new TestProcessor.Factory();
try {
builder.registerProcessor("1", (registry) -> factory3);
fail("addProcessor should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]"));
}
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));
assertThat(registry.getProcessorFactories().size(), equalTo(2));
assertThat(registry.getProcessorFactory("1"), sameInstance(factory1));
assertThat(registry.getProcessorFactory("2"), sameInstance(factory2));
}
}

View File

@ -61,9 +61,10 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
}
@Override
public AbstractStringProcessor create(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field");
return newProcessor(processorTag, field);
public AbstractStringProcessor create(Map<String, Processor.Factory> registry, String tag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(processorType, tag, config, "field");
return newProcessor(tag, field);
}
protected abstract AbstractStringProcessor newProcessor(String processorTag, String field);

View File

@ -74,7 +74,8 @@ public final class AppendProcessor extends AbstractProcessor {
}
@Override
public AppendProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public AppendProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));

View File

@ -162,7 +162,8 @@ public final class ConvertProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
@Override
public ConvertProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public ConvertProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", field);

View File

@ -19,6 +19,14 @@
package org.elasticsearch.ingest.common;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IllformedLocaleException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
@ -29,14 +37,6 @@ import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IllformedLocaleException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
public final class DateIndexNameProcessor extends AbstractProcessor {
public static final String TYPE = "date_index_name";
@ -123,7 +123,8 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
@Override
public DateIndexNameProcessor create(String tag, Map<String, Object> config) throws Exception {
public DateIndexNameProcessor create(Map<String, Processor.Factory> registry, String tag,
Map<String, Object> config) throws Exception {
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "locale");
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);

View File

@ -111,7 +111,8 @@ public final class DateProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
@SuppressWarnings("unchecked")
public DateProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public DateProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone");

View File

@ -65,7 +65,8 @@ public final class FailProcessor extends AbstractProcessor {
}
@Override
public FailProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public FailProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message");
return new FailProcessor(processorTag, templateService.compile(message));
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.Collections;
@ -83,18 +83,12 @@ public final class ForEachProcessor extends AbstractProcessor {
}
public static final class Factory implements Processor.Factory {
private final ProcessorsRegistry processorRegistry;
public Factory(ProcessorsRegistry processorRegistry) {
this.processorRegistry = processorRegistry;
}
@Override
public ForEachProcessor create(String tag, Map<String, Object> config) throws Exception {
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
List<Map<String, Map<String, Object>>> processorConfigs = readList(TYPE, tag, config, "processors");
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, factories);
return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors));
}
}

View File

@ -123,7 +123,8 @@ public final class GrokProcessor extends AbstractProcessor {
}
@Override
public GrokProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public GrokProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
List<String> matchPatterns = ConfigurationUtils.readList(TYPE, processorTag, config, "patterns");
boolean traceMatch = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trace_match", false);

View File

@ -80,7 +80,8 @@ public final class GsubProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
@Override
public GsubProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public GsubProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, processorTag, config, "field");
String pattern = readStringProperty(TYPE, processorTag, config, "pattern");
String replacement = readStringProperty(TYPE, processorTag, config, "replacement");

View File

@ -19,9 +19,6 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -31,9 +28,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class IngestCommonPlugin extends Plugin {
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
public static final String NAME = "ingest-common";
public class IngestCommonPlugin extends Plugin implements IngestPlugin {
private final Map<String, String> builtinPatterns;
@ -41,26 +43,29 @@ public class IngestCommonPlugin extends Plugin {
this.builtinPatterns = loadBuiltinPatterns();
}
public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor(DateProcessor.TYPE, (registry) -> new DateProcessor.Factory());
nodeModule.registerProcessor(SetProcessor.TYPE, (registry) -> new SetProcessor.Factory(registry.getTemplateService()));
nodeModule.registerProcessor(AppendProcessor.TYPE, (registry) -> new AppendProcessor.Factory(registry.getTemplateService()));
nodeModule.registerProcessor(RenameProcessor.TYPE, (registry) -> new RenameProcessor.Factory());
nodeModule.registerProcessor(RemoveProcessor.TYPE, (registry) -> new RemoveProcessor.Factory(registry.getTemplateService()));
nodeModule.registerProcessor(SplitProcessor.TYPE, (registry) -> new SplitProcessor.Factory());
nodeModule.registerProcessor(JoinProcessor.TYPE, (registry) -> new JoinProcessor.Factory());
nodeModule.registerProcessor(UppercaseProcessor.TYPE, (registry) -> new UppercaseProcessor.Factory());
nodeModule.registerProcessor(LowercaseProcessor.TYPE, (registry) -> new LowercaseProcessor.Factory());
nodeModule.registerProcessor(TrimProcessor.TYPE, (registry) -> new TrimProcessor.Factory());
nodeModule.registerProcessor(ConvertProcessor.TYPE, (registry) -> new ConvertProcessor.Factory());
nodeModule.registerProcessor(GsubProcessor.TYPE, (registry) -> new GsubProcessor.Factory());
nodeModule.registerProcessor(FailProcessor.TYPE, (registry) -> new FailProcessor.Factory(registry.getTemplateService()));
nodeModule.registerProcessor(ForEachProcessor.TYPE, (registry) -> new ForEachProcessor.Factory(registry));
nodeModule.registerProcessor(DateIndexNameProcessor.TYPE, (registry) -> new DateIndexNameProcessor.Factory());
nodeModule.registerProcessor(SortProcessor.TYPE, (registry) -> new SortProcessor.Factory());
nodeModule.registerProcessor(GrokProcessor.TYPE, (registry) -> new GrokProcessor.Factory(builtinPatterns));
nodeModule.registerProcessor(ScriptProcessor.TYPE, (registry) ->
new ScriptProcessor.Factory(registry.getScriptService()));
@Override
public Map<String, Processor.Factory> getProcessors(Environment env, ScriptService scriptService,
TemplateService templateService) {
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(DateProcessor.TYPE, new DateProcessor.Factory());
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(templateService));
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(templateService));
processors.put(RenameProcessor.TYPE, new RenameProcessor.Factory());
processors.put(RemoveProcessor.TYPE, new RemoveProcessor.Factory(templateService));
processors.put(SplitProcessor.TYPE, new SplitProcessor.Factory());
processors.put(JoinProcessor.TYPE, new JoinProcessor.Factory());
processors.put(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory());
processors.put(LowercaseProcessor.TYPE, new LowercaseProcessor.Factory());
processors.put(TrimProcessor.TYPE, new TrimProcessor.Factory());
processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory());
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(templateService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory());
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory());
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(builtinPatterns));
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(scriptService));
return Collections.unmodifiableMap(processors);
}
// Code for loading built-in grok patterns packaged with the jar file:

View File

@ -72,7 +72,8 @@ public final class JoinProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
@Override
public JoinProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public JoinProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator");
return new JoinProcessor(processorTag, field, separator);

View File

@ -64,7 +64,8 @@ public final class RemoveProcessor extends AbstractProcessor {
}
@Override
public RemoveProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public RemoveProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new RemoveProcessor(processorTag, templateService.compile(field));
}

View File

@ -77,7 +77,8 @@ public final class RenameProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
@Override
public RenameProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public RenameProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field");
return new RenameProcessor(processorTag, field, targetField);

View File

@ -86,7 +86,8 @@ public final class ScriptProcessor extends AbstractProcessor {
}
@Override
public ScriptProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public ScriptProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = readOptionalStringProperty(TYPE, processorTag, config, "field");
String lang = readStringProperty(TYPE, processorTag, config, "lang");
String inline = readOptionalStringProperty(TYPE, processorTag, config, "inline");

View File

@ -84,7 +84,8 @@ public final class SetProcessor extends AbstractProcessor {
}
@Override
public SetProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public SetProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true);

View File

@ -114,7 +114,8 @@ public final class SortProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
@Override
public SortProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public SortProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, FIELD);
try {
SortOrder direction = SortOrder.fromString(

View File

@ -74,7 +74,8 @@ public final class SplitProcessor extends AbstractProcessor {
public static class Factory implements Processor.Factory {
@Override
public SplitProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public SplitProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator"));
}

View File

@ -51,7 +51,7 @@ public class AppendProcessorFactoryTests extends ESTestCase {
}
config.put("value", value);
String processorTag = randomAsciiOfLength(10);
AppendProcessor appendProcessor = factory.create(processorTag, config);
AppendProcessor appendProcessor = factory.create(null, processorTag, config);
assertThat(appendProcessor.getTag(), equalTo(processorTag));
assertThat(appendProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
assertThat(appendProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo(value));
@ -61,7 +61,7 @@ public class AppendProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("value", "value1");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
@ -72,7 +72,7 @@ public class AppendProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
@ -84,7 +84,7 @@ public class AppendProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("value", null);
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));

View File

@ -38,7 +38,7 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("type", type.toString());
String processorTag = randomAsciiOfLength(10);
ConvertProcessor convertProcessor = factory.create(processorTag, config);
ConvertProcessor convertProcessor = factory.create(null, processorTag, config);
assertThat(convertProcessor.getTag(), equalTo(processorTag));
assertThat(convertProcessor.getField(), equalTo("field1"));
assertThat(convertProcessor.getTargetField(), equalTo("field1"));
@ -52,7 +52,7 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("type", type);
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), Matchers.equalTo("[type] type [" + type + "] not supported, cannot convert field."));
@ -68,7 +68,7 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
String type = "type-" + randomAsciiOfLengthBetween(1, 10);
config.put("type", type);
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
@ -80,7 +80,7 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), Matchers.equalTo("[type] required property is missing"));
@ -95,7 +95,7 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
config.put("target_field", "field2");
config.put("type", type.toString());
String processorTag = randomAsciiOfLength(10);
ConvertProcessor convertProcessor = factory.create(processorTag, config);
ConvertProcessor convertProcessor = factory.create(null, processorTag, config);
assertThat(convertProcessor.getTag(), equalTo(processorTag));
assertThat(convertProcessor.getField(), equalTo("field1"));
assertThat(convertProcessor.getTargetField(), equalTo("field2"));

View File

@ -36,7 +36,7 @@ public class DateIndexNameFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("date_rounding", "y");
DateIndexNameProcessor processor = factory.create(null, config);
DateIndexNameProcessor processor = factory.create(null, null, config);
assertThat(processor.getDateFormats().size(), Matchers.equalTo(1));
assertThat(processor.getField(), Matchers.equalTo("_field"));
assertThat(processor.getIndexNamePrefix(), Matchers.equalTo(""));
@ -53,7 +53,7 @@ public class DateIndexNameFactoryTests extends ESTestCase {
config.put("date_rounding", "y");
config.put("date_formats", Arrays.asList("UNIX", "UNIX_MS"));
DateIndexNameProcessor processor = factory.create(null, config);
DateIndexNameProcessor processor = factory.create(null, null, config);
assertThat(processor.getDateFormats().size(), Matchers.equalTo(2));
config = new HashMap<>();
@ -62,7 +62,7 @@ public class DateIndexNameFactoryTests extends ESTestCase {
config.put("date_rounding", "y");
config.put("index_name_format", "yyyyMMdd");
processor = factory.create(null, config);
processor = factory.create(null, null, config);
assertThat(processor.getIndexNameFormat(), Matchers.equalTo("yyyyMMdd"));
config = new HashMap<>();
@ -71,7 +71,7 @@ public class DateIndexNameFactoryTests extends ESTestCase {
config.put("date_rounding", "y");
config.put("timezone", "+02:00");
processor = factory.create(null, config);
processor = factory.create(null, null, config);
assertThat(processor.getTimezone(), Matchers.equalTo(DateTimeZone.forOffsetHours(2)));
config = new HashMap<>();
@ -79,7 +79,7 @@ public class DateIndexNameFactoryTests extends ESTestCase {
config.put("index_name_prefix", "_prefix");
config.put("date_rounding", "y");
processor = factory.create(null, config);
processor = factory.create(null, null, config);
assertThat(processor.getIndexNamePrefix(), Matchers.equalTo("_prefix"));
}
@ -87,12 +87,12 @@ public class DateIndexNameFactoryTests extends ESTestCase {
DateIndexNameProcessor.Factory factory = new DateIndexNameProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("date_rounding", "y");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
config.clear();
config.put("field", "_field");
e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), Matchers.equalTo("[date_rounding] required property is missing"));
}

View File

@ -41,7 +41,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("field", sourceField);
config.put("formats", Collections.singletonList("dd/MM/yyyyy"));
String processorTag = randomAsciiOfLength(10);
DateProcessor processor = factory.create(processorTag, config);
DateProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo(sourceField));
assertThat(processor.getTargetField(), equalTo(DateProcessor.DEFAULT_TARGET_FIELD));
@ -58,7 +58,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("formats", Collections.singletonList("dd/MM/yyyyy"));
try {
factory.create(null, config);
factory.create(null, null, config);
fail("processor creation should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[field] required property is missing"));
@ -74,7 +74,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("target_field", targetField);
try {
factory.create(null, config);
factory.create(null, null, config);
fail("processor creation should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[formats] required property is missing"));
@ -90,7 +90,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
Locale locale = randomLocale(random());
config.put("locale", locale.toLanguageTag());
DateProcessor processor = factory.create(null, config);
DateProcessor processor = factory.create(null, null, config);
assertThat(processor.getLocale().toLanguageTag(), equalTo(locale.toLanguageTag()));
}
@ -102,7 +102,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("formats", Collections.singletonList("dd/MM/yyyyy"));
config.put("locale", "invalid_locale");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("should fail with invalid locale");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Invalid language tag specified: invalid_locale"));
@ -118,7 +118,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
DateTimeZone timezone = randomDateTimeZone();
config.put("timezone", timezone.getID());
DateProcessor processor = factory.create(null, config);
DateProcessor processor = factory.create(null, null, config);
assertThat(processor.getTimezone(), equalTo(timezone));
}
@ -130,7 +130,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("match_formats", Collections.singletonList("dd/MM/yyyyy"));
config.put("timezone", "invalid_timezone");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("invalid timezone should fail");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("The datetime zone id 'invalid_timezone' is not recognised"));
@ -144,7 +144,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("field", sourceField);
config.put("formats", Arrays.asList("dd/MM/yyyy", "dd-MM-yyyy"));
DateProcessor processor = factory.create(null, config);
DateProcessor processor = factory.create(null, null, config);
assertThat(processor.getFormats(), equalTo(Arrays.asList("dd/MM/yyyy", "dd-MM-yyyy")));
}
@ -156,7 +156,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("formats", "dd/MM/yyyy");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("processor creation should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[formats] property isn't a list, but of type [java.lang.String]"));
@ -172,7 +172,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
config.put("target_field", targetField);
config.put("formats", Arrays.asList("dd/MM/yyyy", "dd-MM-yyyy"));
DateProcessor processor = factory.create(null, config);
DateProcessor processor = factory.create(null, null, config);
assertThat(processor.getTargetField(), equalTo(targetField));
}
}

View File

@ -43,7 +43,7 @@ public class FailProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("message", "error");
String processorTag = randomAsciiOfLength(10);
FailProcessor failProcessor = factory.create(processorTag, config);
FailProcessor failProcessor = factory.create(null, processorTag, config);
assertThat(failProcessor.getTag(), equalTo(processorTag));
assertThat(failProcessor.getMessage().execute(Collections.emptyMap()), equalTo("error"));
}
@ -51,7 +51,7 @@ public class FailProcessorFactoryTests extends ESTestCase {
public void testCreateMissingMessageField() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[message] required property is missing"));

View File

@ -21,7 +21,7 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
@ -36,16 +36,15 @@ import static org.mockito.Mockito.mock;
public class ForEachProcessorFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
Processor processor = new TestProcessor(ingestDocument -> {});
builder.registerProcessor("_name", (registry) -> (tag, config) -> processor);
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(registry);
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
ForEachProcessor forEachProcessor = forEachFactory.create(null, config);
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), Matchers.equalTo("_field"));
assertThat(forEachProcessor.getProcessors().size(), Matchers.equalTo(1));
@ -54,7 +53,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
config = new HashMap<>();
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
try {
forEachFactory.create(null, config);
forEachFactory.create(registry, null, config);
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
@ -63,7 +62,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
config = new HashMap<>();
config.put("field", "_field");
try {
forEachFactory.create(null, config);
forEachFactory.create(registry, null, config);
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), Matchers.equalTo("[processors] required property is missing"));

View File

@ -38,7 +38,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("patterns", Collections.singletonList("(?<foo>\\w+)"));
String processorTag = randomAsciiOfLength(10);
GrokProcessor processor = factory.create(processorTag, config);
GrokProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getMatchField(), equalTo("_field"));
assertThat(processor.getGrok(), notNullValue());
@ -48,7 +48,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("patterns", Collections.singletonList("(?<foo>\\w+)"));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create("tag", config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
@ -56,7 +56,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create("tag", config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[patterns] required property is missing"));
}
@ -65,7 +65,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
config.put("patterns", Collections.emptyList());
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create("tag", config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[patterns] List of patterns must not be empty"));
}
@ -76,7 +76,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!"));
config.put("pattern_definitions", Collections.singletonMap("MY_PATTERN", "foo"));
GrokProcessor processor = factory.create(null, config);
GrokProcessor processor = factory.create(null, null, config);
assertThat(processor.getMatchField(), equalTo("_field"));
assertThat(processor.getGrok(), notNullValue());
assertThat(processor.getGrok().match("foo!"), equalTo(true));
@ -87,7 +87,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("patterns", Collections.singletonList("["));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create("tag", config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[patterns] Invalid regex pattern found in: [[]. premature end of char-class"));
}
@ -97,7 +97,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!"));
config.put("pattern_definitions", Collections.singletonMap("MY_PATTERN", "["));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create("tag", config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(),
equalTo("[patterns] Invalid regex pattern found in: [%{MY_PATTERN:name}!]. premature end of char-class"));
}

View File

@ -37,7 +37,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
config.put("pattern", "\\.");
config.put("replacement", "-");
String processorTag = randomAsciiOfLength(10);
GsubProcessor gsubProcessor = factory.create(processorTag, config);
GsubProcessor gsubProcessor = factory.create(null, processorTag, config);
assertThat(gsubProcessor.getTag(), equalTo(processorTag));
assertThat(gsubProcessor.getField(), equalTo("field1"));
assertThat(gsubProcessor.getPattern().toString(), equalTo("\\."));
@ -50,7 +50,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
config.put("pattern", "\\.");
config.put("replacement", "-");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
@ -63,7 +63,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("replacement", "-");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[pattern] required property is missing"));
@ -76,7 +76,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("pattern", "\\.");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[replacement] required property is missing"));
@ -90,7 +90,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
config.put("pattern", "[");
config.put("replacement", "-");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[pattern] Invalid regex pattern. Unclosed character class"));

View File

@ -35,7 +35,7 @@ public class JoinProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("separator", "-");
String processorTag = randomAsciiOfLength(10);
JoinProcessor joinProcessor = factory.create(processorTag, config);
JoinProcessor joinProcessor = factory.create(null, processorTag, config);
assertThat(joinProcessor.getTag(), equalTo(processorTag));
assertThat(joinProcessor.getField(), equalTo("field1"));
assertThat(joinProcessor.getSeparator(), equalTo("-"));
@ -46,7 +46,7 @@ public class JoinProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("separator", "-");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
@ -58,7 +58,7 @@ public class JoinProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[separator] required property is missing"));

View File

@ -34,7 +34,7 @@ public class LowercaseProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
LowercaseProcessor uppercaseProcessor = (LowercaseProcessor)factory.create(processorTag, config);
LowercaseProcessor uppercaseProcessor = (LowercaseProcessor)factory.create(null, processorTag, config);
assertThat(uppercaseProcessor.getTag(), equalTo(processorTag));
assertThat(uppercaseProcessor.getField(), equalTo("field1"));
}
@ -43,7 +43,7 @@ public class LowercaseProcessorFactoryTests extends ESTestCase {
LowercaseProcessor.Factory factory = new LowercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));

View File

@ -43,7 +43,7 @@ public class RemoveProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
RemoveProcessor removeProcessor = factory.create(processorTag, config);
RemoveProcessor removeProcessor = factory.create(null, processorTag, config);
assertThat(removeProcessor.getTag(), equalTo(processorTag));
assertThat(removeProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
}
@ -51,7 +51,7 @@ public class RemoveProcessorFactoryTests extends ESTestCase {
public void testCreateMissingField() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));

View File

@ -35,7 +35,7 @@ public class RenameProcessorFactoryTests extends ESTestCase {
config.put("field", "old_field");
config.put("target_field", "new_field");
String processorTag = randomAsciiOfLength(10);
RenameProcessor renameProcessor = factory.create(processorTag, config);
RenameProcessor renameProcessor = factory.create(null, processorTag, config);
assertThat(renameProcessor.getTag(), equalTo(processorTag));
assertThat(renameProcessor.getField(), equalTo("old_field"));
assertThat(renameProcessor.getTargetField(), equalTo("new_field"));
@ -46,7 +46,7 @@ public class RenameProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("target_field", "new_field");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
@ -58,7 +58,7 @@ public class RenameProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "old_field");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));

View File

@ -25,6 +25,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -55,8 +56,7 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
configMap.put("lang", "mockscript");
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> factory.create(randomAsciiOfLength(10), configMap));
() -> factory.create(null, randomAsciiOfLength(10), configMap));
assertThat(exception.getMessage(), is("[null] Only one of [file], [id], or [inline] may be configured"));
}
@ -66,7 +66,7 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
configMap.put("lang", "mockscript");
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> factory.create(randomAsciiOfLength(10), configMap));
() -> factory.create(null, randomAsciiOfLength(10), configMap));
assertThat(exception.getMessage(), is("[null] Need [file], [id], or [inline] parameter to refer to scripts"));
}

View File

@ -44,7 +44,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("value", "value1");
String processorTag = randomAsciiOfLength(10);
SetProcessor setProcessor = factory.create(processorTag, config);
SetProcessor setProcessor = factory.create(null, processorTag, config);
assertThat(setProcessor.getTag(), equalTo(processorTag));
assertThat(setProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
assertThat(setProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo("value1"));
@ -58,7 +58,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
config.put("value", "value1");
config.put("override", overrideEnabled);
String processorTag = randomAsciiOfLength(10);
SetProcessor setProcessor = factory.create(processorTag, config);
SetProcessor setProcessor = factory.create(null, processorTag, config);
assertThat(setProcessor.getTag(), equalTo(processorTag));
assertThat(setProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
assertThat(setProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo("value1"));
@ -69,7 +69,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("value", "value1");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
@ -80,7 +80,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
@ -92,7 +92,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("value", null);
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));

View File

@ -35,7 +35,7 @@ public class SplitProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("separator", "\\.");
String processorTag = randomAsciiOfLength(10);
SplitProcessor splitProcessor = factory.create(processorTag, config);
SplitProcessor splitProcessor = factory.create(null, processorTag, config);
assertThat(splitProcessor.getTag(), equalTo(processorTag));
assertThat(splitProcessor.getField(), equalTo("field1"));
assertThat(splitProcessor.getSeparator(), equalTo("\\."));
@ -46,7 +46,7 @@ public class SplitProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("separator", "\\.");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
@ -58,7 +58,7 @@ public class SplitProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[separator] required property is missing"));

View File

@ -84,7 +84,7 @@ public class SplitProcessorTests extends ESTestCase {
Map<String, Object> splitConfig = new HashMap<>();
splitConfig.put("field", "flags");
splitConfig.put("separator", "\\|");
Processor splitProcessor = (new SplitProcessor.Factory()).create("tag", splitConfig);
Processor splitProcessor = (new SplitProcessor.Factory()).create(null, null, splitConfig);
Map<String, Object> source = new HashMap<>();
source.put("flags", "new|hot|super|fun|interesting");
IngestDocument ingestDocument = new IngestDocument(source, new HashMap<>());

View File

@ -34,7 +34,7 @@ public class TrimProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
TrimProcessor uppercaseProcessor = (TrimProcessor)factory.create(processorTag, config);
TrimProcessor uppercaseProcessor = (TrimProcessor)factory.create(null, processorTag, config);
assertThat(uppercaseProcessor.getTag(), equalTo(processorTag));
assertThat(uppercaseProcessor.getField(), equalTo("field1"));
}
@ -43,7 +43,7 @@ public class TrimProcessorFactoryTests extends ESTestCase {
TrimProcessor.Factory factory = new TrimProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));

View File

@ -34,7 +34,7 @@ public class UppercaseProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
UppercaseProcessor uppercaseProcessor = (UppercaseProcessor)factory.create(processorTag, config);
UppercaseProcessor uppercaseProcessor = (UppercaseProcessor)factory.create(null, processorTag, config);
assertThat(uppercaseProcessor.getTag(), equalTo(processorTag));
assertThat(uppercaseProcessor.getField(), equalTo("field1"));
}
@ -43,7 +43,7 @@ public class UppercaseProcessorFactoryTests extends ESTestCase {
UppercaseProcessor.Factory factory = new UppercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, config);
factory.create(null, null, config);
fail("factory create should have failed");
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));

View File

@ -155,7 +155,8 @@ public final class AttachmentProcessor extends AbstractProcessor {
static final Set<Property> DEFAULT_PROPERTIES = EnumSet.allOf(Property.class);
@Override
public AttachmentProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public AttachmentProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "attachment");
List<String> properyNames = readOptionalList(TYPE, processorTag, config, "properties");

View File

@ -19,15 +19,21 @@
package org.elasticsearch.ingest.attachment;
import org.elasticsearch.node.NodeModule;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
public class IngestAttachmentPlugin extends Plugin implements IngestPlugin {
public class IngestAttachmentPlugin extends Plugin {
public void onModule(NodeModule nodeModule) throws IOException {
nodeModule.registerProcessor(AttachmentProcessor.TYPE,
(registry) -> new AttachmentProcessor.Factory());
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
return Collections.singletonMap(AttachmentProcessor.TYPE, new AttachmentProcessor.Factory());
}
}

View File

@ -47,7 +47,7 @@ public class AttachmentProcessorFactoryTests extends ESTestCase {
String processorTag = randomAsciiOfLength(10);
AttachmentProcessor processor = factory.create(processorTag, config);
AttachmentProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("attachment"));
@ -61,7 +61,7 @@ public class AttachmentProcessorFactoryTests extends ESTestCase {
config.put("indexed_chars", indexedChars);
String processorTag = randomAsciiOfLength(10);
AttachmentProcessor processor = factory.create(processorTag, config);
AttachmentProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getIndexedChars(), is(indexedChars));
}
@ -70,7 +70,7 @@ public class AttachmentProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("target_field", "_field");
AttachmentProcessor processor = factory.create(null, config);
AttachmentProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_field"));
}
@ -87,7 +87,7 @@ public class AttachmentProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("properties", fieldNames);
AttachmentProcessor processor = factory.create(null, config);
AttachmentProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getProperties(), equalTo(properties));
}
@ -97,7 +97,7 @@ public class AttachmentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("properties", Collections.singletonList("invalid"));
try {
factory.create(null, config);
factory.create(null, null, config);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[properties] illegal field option [invalid]"));
@ -111,7 +111,7 @@ public class AttachmentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("properties", "invalid");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));

View File

@ -19,25 +19,6 @@
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.City;
import com.maxmind.geoip2.record.Continent;
import com.maxmind.geoip2.record.Country;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Subdivision;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
@ -51,6 +32,23 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.City;
import com.maxmind.geoip2.record.Continent;
import com.maxmind.geoip2.record.Country;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Subdivision;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
@ -231,7 +229,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
}
@Override
public GeoIpProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public GeoIpProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb.gz");
@ -239,7 +238,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
DatabaseReader databaseReader = databaseReaders.get(databaseFile);
if (databaseReader == null) {
throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist");
throw newConfigurationException(TYPE, processorTag,
"database_file", "database file [" + databaseFile + "] doesn't exist");
}
String databaseType = databaseReader.getMetadata().getDatabaseType();

View File

@ -19,11 +19,6 @@
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@ -38,20 +33,36 @@ import java.util.Map;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
public class IngestGeoIpPlugin extends Plugin implements Closeable {
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
private Map<String, DatabaseReader> databaseReaders;
public void onModule(NodeModule nodeModule) throws IOException {
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
if (databaseReaders != null) {
throw new IllegalStateException("called onModule twice for geoip plugin!!");
}
Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip");
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (registry) -> new GeoIpProcessor.Factory(databaseReaders));
Path geoIpConfigDirectory = env.configFile().resolve("ingest-geoip");
try {
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
}
public static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}

View File

@ -75,7 +75,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
String processorTag = randomAsciiOfLength(10);
GeoIpProcessor processor = factory.create(processorTag, config);
GeoIpProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
@ -91,7 +91,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("database_file", "GeoLite2-Country.mmdb.gz");
String processorTag = randomAsciiOfLength(10);
GeoIpProcessor processor = factory.create(processorTag, config);
GeoIpProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
@ -104,7 +105,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("target_field", "_field");
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_field"));
}
@ -114,7 +115,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
@ -130,7 +131,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
String cityProperty = RandomPicks.randomFrom(Randomness.get(), cityOnlyProperties).toString();
config.put("properties", Collections.singletonList(cityProperty));
try {
factory.create(null, config);
factory.create(null, null, config);
fail("Exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[properties] illegal property value [" + cityProperty +
@ -145,7 +146,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("database_file", "does-not-exist.mmdb.gz");
try {
factory.create(null, config);
factory.create(null, null, config);
fail("Exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb.gz] doesn't exist"));
@ -166,7 +167,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("properties", fieldNames);
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getProperties(), equalTo(properties));
}
@ -178,7 +179,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("properties", Collections.singletonList("invalid"));
try {
factory.create(null, config);
factory.create(null, null, config);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[properties] illegal property value [invalid]. valid values are [IP, COUNTRY_ISO_CODE, " +
@ -189,7 +190,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("properties", "invalid");
try {
factory.create("tag", config);
factory.create(null, null, config);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));

View File

@ -20,8 +20,14 @@
package org.elasticsearch.ingest.useragent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.io.InputStream;
@ -34,28 +40,31 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
public class IngestUserAgentPlugin extends Plugin {
public class IngestUserAgentPlugin extends Plugin implements IngestPlugin {
private final Setting<Long> CACHE_SIZE_SETTING = Setting.longSetting("ingest.useragent.cache_size", 1000, 0,
Setting.Property.NodeScope);
static final String DEFAULT_PARSER_NAME = "_default_";
public void onModule(NodeModule nodeModule) throws IOException {
Path userAgentConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-useragent");
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
Path userAgentConfigDirectory = env.configFile().resolve("ingest-useragent");
if (Files.exists(userAgentConfigDirectory) == false && Files.isDirectory(userAgentConfigDirectory)) {
throw new IllegalStateException(
"the user agent directory [" + userAgentConfigDirectory + "] containing the regex file doesn't exist");
"the user agent directory [" + userAgentConfigDirectory + "] containing the regex file doesn't exist");
}
long cacheSize = CACHE_SIZE_SETTING.get(nodeModule.getNode().settings());
UserAgentCache cache = new UserAgentCache(cacheSize);
Map<String, UserAgentParser> userAgentParsers = createUserAgentParsers(userAgentConfigDirectory, cache);
nodeModule.registerProcessor(UserAgentProcessor.TYPE, (registry) -> new UserAgentProcessor.Factory(userAgentParsers));
long cacheSize = CACHE_SIZE_SETTING.get(env.settings());
Map<String, UserAgentParser> userAgentParsers;
try {
userAgentParsers = createUserAgentParsers(userAgentConfigDirectory, new UserAgentCache(cacheSize));
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(UserAgentProcessor.TYPE, new UserAgentProcessor.Factory(userAgentParsers));
}
static Map<String, UserAgentParser> createUserAgentParsers(Path userAgentConfigDirectory, UserAgentCache cache) throws IOException {

View File

@ -194,7 +194,8 @@ public class UserAgentProcessor extends AbstractProcessor {
}
@Override
public UserAgentProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public UserAgentProcessor create(Map<String, Processor.Factory> factories, String processorTag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "useragent");
String regexFilename = readStringProperty(TYPE, processorTag, config, "regex_file", IngestUserAgentPlugin.DEFAULT_PARSER_NAME);

View File

@ -81,7 +81,7 @@ public class UserAgentProcessorFactoryTests extends ESTestCase {
String processorTag = randomAsciiOfLength(10);
UserAgentProcessor processor = factory.create(processorTag, config);
UserAgentProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("useragent"));
@ -98,7 +98,7 @@ public class UserAgentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("target_field", "_target_field");
UserAgentProcessor processor = factory.create(null, config);
UserAgentProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_target_field"));
}
@ -110,7 +110,7 @@ public class UserAgentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("regex_file", regexWithoutDevicesFilename);
UserAgentProcessor processor = factory.create(null, config);
UserAgentProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getUaParser().getUaPatterns().size(), greaterThan(0));
assertThat(processor.getUaParser().getOsPatterns().size(), greaterThan(0));
@ -124,7 +124,7 @@ public class UserAgentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("regex_file", "does-not-exist.yaml");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[regex_file] regex file [does-not-exist.yaml] doesn't exist (has to exist at node startup)"));
}
@ -144,7 +144,7 @@ public class UserAgentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("properties", fieldNames);
UserAgentProcessor processor = factory.create(null, config);
UserAgentProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getProperties(), equalTo(properties));
}
@ -156,7 +156,7 @@ public class UserAgentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("properties", Collections.singletonList("invalid"));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[properties] illegal property value [invalid]. valid values are [NAME, MAJOR, MINOR, "
+ "PATCH, OS, OS_NAME, OS_MAJOR, OS_MINOR, DEVICE, BUILD]"));
}
@ -168,7 +168,7 @@ public class UserAgentProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("properties", "invalid");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));
}
}

View File

@ -19,22 +19,27 @@
package org.elasticsearch.ingest;
import org.elasticsearch.node.NodeModule;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
/**
* Adds an ingest processor to be used in tests.
*/
public class IngestTestPlugin extends Plugin {
public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor("test", (registry) -> (tag, config) ->
new TestProcessor("id", "test", doc -> {
doc.setFieldValue("processed", true);
if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");
}
})
);
public class IngestTestPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
return Collections.singletonMap("test", (factories, tag, config) ->
new TestProcessor("id", "test", doc -> {
doc.setFieldValue("processed", true);
if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");
}
}));
}
}

View File

@ -66,7 +66,7 @@ public class TestProcessor implements Processor {
public static final class Factory implements Processor.Factory {
@Override
public TestProcessor create(String processorTag, Map<String, Object> config) throws Exception {
public TestProcessor create(Map<String, Processor.Factory> registry, String processorTag, Map<String, Object> config) throws Exception {
return new TestProcessor(processorTag, "test-processor", ingestDocument -> {});
}
}