Tests pass, started removing generics from processor factory

This commit is contained in:
Ryan Ernst 2016-06-30 01:49:22 -07:00
parent f4519c44b7
commit 08b3b6264e
50 changed files with 259 additions and 360 deletions

View File

@ -141,7 +141,7 @@ public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineReque
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorRegistry());
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

View File

@ -25,14 +25,15 @@ import java.util.Map;
* A processor implementation may modify the data belonging to a document.
* Whether changes are made and what exactly is modified is up to the implementation.
*/
public abstract class AbstractProcessorFactory<P extends Processor> implements Processor.Factory<P> {
public abstract class AbstractProcessorFactory implements Processor.Factory {
public static final String TAG_KEY = "tag";
@Override
public P create(ProcessorsRegistry registry, Map<String, Object> config) throws Exception {
public Processor create(Map<String, Processor.Factory> registry, Map<String, Object> config) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
return doCreate(registry, tag, config);
}
protected abstract P doCreate(ProcessorsRegistry registry, String tag, Map<String, Object> config) throws Exception;
protected abstract Processor doCreate(Map<String, Processor.Factory> registry, String tag,
Map<String, Object> config) throws Exception;
}

View File

@ -234,12 +234,12 @@ public final class ConfigurationUtils {
}
public static List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs,
ProcessorsRegistry processorRegistry) throws Exception {
Map<String, Processor.Factory> processorFactories) throws Exception {
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue()));
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
}
}
}
@ -247,15 +247,15 @@ public final class ConfigurationUtils {
return processors;
}
private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorRegistry.getProcessorFactory(type);
private static Processor readProcessor(Map<String, Processor.Factory> processorFactories, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorFactories.get(type);
if (factory != null) {
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
Processor processor = factory.create(processorRegistry, config);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
Processor processor = factory.create(processorFactories, config);
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(processor.getType(), processor.getTag(), Pipeline.ON_FAILURE_KEY,

View File

@ -19,16 +19,18 @@
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
/**
* Holder class for several ingest related services.
@ -37,11 +39,20 @@ public class IngestService {
private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;
private final ProcessorsRegistry.Builder processorsRegistryBuilder;
public IngestService(Settings settings, ThreadPool threadPool, ProcessorsRegistry.Builder processorsRegistryBuilder) {
this.processorsRegistryBuilder = processorsRegistryBuilder;
this.pipelineStore = new PipelineStore(settings);
public IngestService(Settings settings, ThreadPool threadPool,
Environment env, ScriptService scriptService, List<IngestPlugin> ingestPlugins) {
final TemplateService templateService = new InternalTemplateService(scriptService);
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (IngestPlugin ingestPlugin : ingestPlugins) {
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(env, scriptService, templateService);
for (Map.Entry<String, Processor.Factory> entry : newProcessors.entrySet()) {
if (processorFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered");
}
}
}
this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories));
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
}
@ -53,12 +64,8 @@ public class IngestService {
return pipelineExecutionService;
}
public void buildProcessorsFactoryRegistry(ScriptService scriptService, ClusterService clusterService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService, clusterService);
}
public IngestInfo info() {
Map<String, Processor.Factory> processorFactories = pipelineStore.getProcessorRegistry().getProcessorFactories();
Map<String, Processor.Factory> processorFactories = pipelineStore.getProcessorFactories();
List<ProcessorInfo> processorInfoList = new ArrayList<>(processorFactories.size());
for (Map.Entry<String, Processor.Factory> entry : processorFactories.entrySet()) {
processorInfoList.add(new ProcessorInfo(entry.getKey()));

View File

@ -98,13 +98,13 @@ public final class Pipeline {
public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, ProcessorsRegistry processorRegistry) throws Exception {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorFactories) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("pipeline [" + id +
"] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));

View File

@ -19,6 +19,12 @@
package org.elasticsearch.ingest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
@ -37,20 +43,11 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ScriptService;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PipelineStore extends AbstractComponent implements ClusterStateListener {
private final Pipeline.Factory factory = new Pipeline.Factory();
private ProcessorsRegistry processorRegistry;
private final Map<String, Processor.Factory> processorFactories;
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
@ -58,13 +55,9 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
volatile Map<String, Pipeline> pipelines = new HashMap<>();
public PipelineStore(Settings settings) {
public PipelineStore(Settings settings, Map<String, Processor.Factory> processorFactories) {
super(settings);
}
public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService,
ClusterService clusterService) {
this.processorRegistry = processorsRegistryBuilder.build(scriptService, clusterService);
this.processorFactories = processorFactories;
}
@Override
@ -81,7 +74,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
Map<String, Pipeline> pipelines = new HashMap<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorRegistry));
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
} catch (ElasticsearchParseException e) {
throw e;
} catch (Exception e) {
@ -157,7 +150,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
}
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorRegistry);
Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories);
List<IllegalArgumentException> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
@ -194,8 +187,8 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
return pipelines.get(id);
}
public ProcessorsRegistry getProcessorRegistry() {
return processorRegistry;
public Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}
/**

View File

@ -45,14 +45,17 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
interface Factory<P extends Processor> {
interface Factory {
/**
* Creates a processor based on the specified map of maps config.
*
* Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can
* verify if all configurations settings have been used.
* @param processorFactories Other processors which may be created inside this processor
* @param config The configuration for the processor
*
* <b>Note:</b> Implementations are responsible for removing the used configuration keys, so that after
* creating a pipeline ingest can verify if all configurations settings have been used.
*/
P create(ProcessorsRegistry registry, Map<String, Object> config) throws Exception;
Processor create(Map<String, Processor.Factory> processorFactories, Map<String, Object> config) throws Exception;
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
public final class ProcessorsRegistry {
private final Map<String, Processor.Factory> processorFactories;
public ProcessorsRegistry(Map<String, Processor.Factory> processors) {
this.processorFactories = Collections.unmodifiableMap(processors);
}
public Processor.Factory getProcessorFactory(String name) {
return processorFactories.get(name);
}
// For testing:
Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}
}

View File

@ -85,12 +85,14 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
@ -255,6 +257,9 @@ public class Node implements Closeable {
final TribeService tribeService = new TribeService(settings, clusterService);
resourcesToClose.add(tribeService);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
final IngestService ingestService = new IngestService(settings, threadPool, environment,
scriptModule.getScriptService(), pluginsService.filterPlugins(IngestPlugin.class));
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
@ -293,6 +298,7 @@ public class Node implements Closeable {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
}
);
injector = modules.createInjector();

View File

@ -20,28 +20,17 @@
package org.elasticsearch.node;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService;
import java.util.function.Function;
/**
*
*/
public class NodeModule extends AbstractModule {
private final Node node;
private final MonitorService monitorService;
private final ProcessorsRegistry.Builder processorsRegistryBuilder;
public NodeModule(Node node, MonitorService monitorService) {
this.node = node;
this.monitorService = monitorService;
this.processorsRegistryBuilder = new ProcessorsRegistry.Builder();
}
@Override
@ -49,20 +38,5 @@ public class NodeModule extends AbstractModule {
bind(Node.class).toInstance(node);
bind(MonitorService.class).toInstance(monitorService);
bind(NodeService.class).asEagerSingleton();
bind(ProcessorsRegistry.Builder.class).toInstance(processorsRegistryBuilder);
}
/**
* Returns the node
*/
public Node getNode() {
return node;
}
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, Function<ProcessorsRegistry, Processor.Factory<?>> provider) {
processorsRegistryBuilder.registerProcessor(type, provider);
}
}

View File

@ -19,6 +19,11 @@
package org.elasticsearch.node.service;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
@ -35,18 +40,12 @@ import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
@ -76,7 +75,7 @@ public class NodeService extends AbstractComponent implements Closeable {
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService,
Discovery discovery, TransportService transportService, IndicesService indicesService,
PluginsService pluginService, CircuitBreakerService circuitBreakerService,
ProcessorsRegistry.Builder processorsRegistryBuilder, ClusterService clusterService, SettingsFilter settingsFilter) {
IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
@ -86,17 +85,17 @@ public class NodeService extends AbstractComponent implements Closeable {
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
this.clusterService = clusterService;
this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder);
this.ingestService = ingestService;
this.settingsFilter = settingsFilter;
clusterService.add(ingestService.getPipelineStore());
clusterService.add(ingestService.getPipelineExecutionService());
}
// can not use constructor injection or there will be a circular dependency
// nocommit: try removing this...
@Inject(optional = true)
public void setScriptService(ScriptService scriptService) {
this.scriptService = scriptService;
this.ingestService.buildProcessorsFactoryRegistry(scriptService, clusterService);
}
public void setHttpServer(@Nullable HttpServer httpServer) {

View File

@ -21,9 +21,8 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
@ -59,12 +58,11 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder();
processorRegistryBuilder.registerProcessor("mock_processor", ((registry) -> mock(Processor.Factory.class)));
ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(mock(ScriptService.class), mock(ClusterService.class));
Map<String, Processor.Factory> registry =
Collections.singletonMap("mock_processor", (factories, config) -> processor);
store = mock(PipelineStore.class);
when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.getProcessorRegistry()).thenReturn(processorRegistry);
when(store.getProcessorFactories()).thenReturn(registry);
}
public void testParseUsingPipelineStore() throws Exception {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.BytesRestResponse;
@ -45,6 +46,7 @@ import org.junit.Before;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
public class HttpServerTests extends ESTestCase {
@ -74,8 +76,9 @@ public class HttpServerTests extends ESTestCase {
ClusterService clusterService = new ClusterService(Settings.EMPTY,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null,
clusterService, null);
IngestService ingestService = new IngestService(settings, null, null, null, Collections.emptyList());
NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null,
ingestService, clusterService, null);
httpServer = new HttpServer(settings, httpServerTransport, restController, nodeService, circuitBreakerService);
httpServer.start();
}

View File

@ -19,14 +19,6 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -34,6 +26,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
@ -97,10 +95,8 @@ public class ConfigurationUtilsTests extends ESTestCase {
public void testReadProcessors() throws Exception {
Processor processor = mock(Processor.class);
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
builder.registerProcessor("test_processor", (registry) -> config -> processor);
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));
Map<String, Processor.Factory> registry =
Collections.singletonMap("test_processor", (factories, config) -> processor);
List<Map<String, Map<String, Object>>> config = new ArrayList<>();
Map<String, Object> emptyConfig = Collections.emptyMap();

View File

@ -46,7 +46,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -62,7 +62,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
Pipeline.Factory factory = new Pipeline.Factory();
try {
factory.create("_id", pipelineConfig, createProcessorRegistry(Collections.emptyMap()));
factory.create("_id", pipelineConfig, Collections.emptyMap());
fail("should fail, missing required [processors] field");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
@ -76,7 +76,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -93,7 +93,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined"));
}
@ -105,7 +105,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty"));
}
@ -114,7 +114,7 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("ignore_failure", true);
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
@ -139,7 +139,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
@ -152,7 +152,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -169,12 +169,4 @@ public class PipelineFactoryTests extends ESTestCase {
List<Processor> flattened = pipeline.flattenAllProcessors();
assertThat(flattened.size(), equalTo(4));
}
private ProcessorsRegistry createProcessorRegistry(Map<String, Processor.Factory> processorRegistry) {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
for (Map.Entry<String, Processor.Factory> entry : processorRegistry.entrySet()) {
builder.registerProcessor(entry.getKey(), ((registry) -> entry.getValue()));
}
return builder.build(mock(ScriptService.class), mock(ClusterService.class));
}
}

View File

@ -56,9 +56,8 @@ public class PipelineStoreTests extends ESTestCase {
@Before
public void init() throws Exception {
store = new PipelineStore(Settings.EMPTY);
ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder();
registryBuilder.registerProcessor("set", (registry) -> config -> {
Map<String, Processor.Factory> processorFactories = new HashMap<>();
processorFactories.put("set", (factories, config) -> {
String field = (String) config.remove("field");
String value = (String) config.remove("value");
return new Processor() {
@ -78,7 +77,7 @@ public class PipelineStoreTests extends ESTestCase {
}
};
});
registryBuilder.registerProcessor("remove", (registry) -> config -> {
processorFactories.put("remove", (factories, config) -> {
String field = (String) config.remove("field");
return new Processor() {
@Override
@ -97,7 +96,7 @@ public class PipelineStoreTests extends ESTestCase {
}
};
});
store.buildProcessorFactoryRegistry(registryBuilder, mock(ScriptService.class), mock(ClusterService.class));
store = new PipelineStore(Settings.EMPTY, processorFactories);
}
public void testUpdatePipelines() {

View File

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.mockito.Mockito.mock;
public class ProcessorsRegistryTests extends ESTestCase {
public void testBuildProcessorRegistry() {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
TestProcessor.Factory factory1 = new TestProcessor.Factory();
builder.registerProcessor("1", (registry) -> factory1);
TestProcessor.Factory factory2 = new TestProcessor.Factory();
builder.registerProcessor("2", (registry) -> factory2);
TestProcessor.Factory factory3 = new TestProcessor.Factory();
try {
builder.registerProcessor("1", (registry) -> factory3);
fail("addProcessor should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]"));
}
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));
assertThat(registry.getProcessorFactories().size(), equalTo(2));
assertThat(registry.getProcessorFactory("1"), sameInstance(factory1));
assertThat(registry.getProcessorFactory("2"), sameInstance(factory2));
}
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.Map;
@ -54,7 +54,7 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
protected abstract String process(String value);
static abstract class Factory<T extends AbstractStringProcessor> extends AbstractProcessorFactory<T> {
static abstract class Factory extends AbstractProcessorFactory {
protected final String processorType;
protected Factory(String processorType) {
@ -62,11 +62,11 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
}
@Override
public T doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public AbstractStringProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field");
return newProcessor(processorTag, field);
}
protected abstract T newProcessor(String processorTag, String field);
protected abstract AbstractStringProcessor newProcessor(String processorTag, String field);
}
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
@ -65,7 +65,7 @@ public final class AppendProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<AppendProcessor> {
public static final class Factory extends AbstractProcessorFactory {
private final TemplateService templateService;
@ -74,7 +74,8 @@ public final class AppendProcessor extends AbstractProcessor {
}
@Override
public AppendProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public AppendProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.List;
@ -161,9 +161,10 @@ public final class ConvertProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<ConvertProcessor> {
public static final class Factory extends AbstractProcessorFactory {
@Override
public ConvertProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public ConvertProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", field);

View File

@ -19,17 +19,6 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IllformedLocaleException;
@ -38,6 +27,17 @@ import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
public final class DateIndexNameProcessor extends AbstractProcessor {
public static final String TYPE = "date_index_name";
@ -121,10 +121,11 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
return dateFormats;
}
public static final class Factory extends AbstractProcessorFactory<DateIndexNameProcessor> {
public static final class Factory extends AbstractProcessorFactory {
@Override
protected DateIndexNameProcessor doCreate(ProcessorsRegistry registry, String tag, Map<String, Object> config) throws Exception {
protected DateIndexNameProcessor doCreate(Map<String, Processor.Factory> registry, String tag,
Map<String, Object> config) throws Exception {
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "locale");
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);

View File

@ -24,7 +24,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
@ -109,10 +109,11 @@ public final class DateProcessor extends AbstractProcessor {
return formats;
}
public static final class Factory extends AbstractProcessorFactory<DateProcessor> {
public static final class Factory extends AbstractProcessorFactory {
@SuppressWarnings("unchecked")
public DateProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public DateProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone");

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import java.util.Map;
@ -57,7 +57,7 @@ public final class FailProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<FailProcessor> {
public static final class Factory extends AbstractProcessorFactory {
private final TemplateService templateService;
@ -66,7 +66,8 @@ public final class FailProcessor extends AbstractProcessor {
}
@Override
public FailProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public FailProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message");
return new FailProcessor(processorTag, templateService.compile(message));
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.Collections;
@ -83,12 +83,13 @@ public final class ForEachProcessor extends AbstractProcessor {
return processors;
}
public static final class Factory extends AbstractProcessorFactory<ForEachProcessor> {
public static final class Factory extends AbstractProcessorFactory {
@Override
protected ForEachProcessor doCreate(String tag, Map<String, Object> config) throws Exception {
protected ForEachProcessor doCreate(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
List<Map<String, Map<String, Object>>> processorConfigs = readList(TYPE, tag, config, "processors");
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, factories);
return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors));
}
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.HashMap;
import java.util.List;
@ -115,7 +115,7 @@ public final class GrokProcessor extends AbstractProcessor {
return combinedPattern;
}
public final static class Factory extends AbstractProcessorFactory<GrokProcessor> {
public final static class Factory extends AbstractProcessorFactory {
private final Map<String, String> builtinPatterns;
@ -124,7 +124,8 @@ public final class GrokProcessor extends AbstractProcessor {
}
@Override
public GrokProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public GrokProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
List<String> matchPatterns = ConfigurationUtils.readList(TYPE, processorTag, config, "patterns");
boolean traceMatch = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trace_match", false);

View File

@ -22,7 +22,7 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.Map;
import java.util.regex.Matcher;
@ -79,9 +79,10 @@ public final class GsubProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<GsubProcessor> {
public static final class Factory extends AbstractProcessorFactory {
@Override
public GsubProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public GsubProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, processorTag, config, "field");
String pattern = readStringProperty(TYPE, processorTag, config, "pattern");
String replacement = readStringProperty(TYPE, processorTag, config, "replacement");

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.List;
import java.util.Map;
@ -71,9 +71,10 @@ public final class JoinProcessor extends AbstractProcessor {
return TYPE;
}
public final static class Factory extends AbstractProcessorFactory<JoinProcessor> {
public final static class Factory extends AbstractProcessorFactory {
@Override
public JoinProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public JoinProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator");
return new JoinProcessor(processorTag, field, separator);

View File

@ -44,7 +44,7 @@ public final class LowercaseProcessor extends AbstractStringProcessor {
return TYPE;
}
public final static class Factory extends AbstractStringProcessor.Factory<LowercaseProcessor> {
public final static class Factory extends AbstractStringProcessor.Factory {
public Factory() {
super(TYPE);

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import java.util.Map;
@ -56,7 +56,7 @@ public final class RemoveProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<RemoveProcessor> {
public static final class Factory extends AbstractProcessorFactory {
private final TemplateService templateService;
@ -65,7 +65,8 @@ public final class RemoveProcessor extends AbstractProcessor {
}
@Override
public RemoveProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public RemoveProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new RemoveProcessor(processorTag, templateService.compile(field));
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.Map;
@ -76,9 +76,10 @@ public final class RenameProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<RenameProcessor> {
public static final class Factory extends AbstractProcessorFactory {
@Override
public RenameProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public RenameProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field");
return new RenameProcessor(processorTag, field, targetField);

View File

@ -26,7 +26,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
@ -78,7 +78,7 @@ public final class ScriptProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<ScriptProcessor> {
public static final class Factory extends AbstractProcessorFactory {
private final ScriptService scriptService;
@ -87,7 +87,8 @@ public final class ScriptProcessor extends AbstractProcessor {
}
@Override
public ScriptProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public ScriptProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = readOptionalStringProperty(TYPE, processorTag, config, "field");
String lang = readStringProperty(TYPE, processorTag, config, "lang");
String inline = readOptionalStringProperty(TYPE, processorTag, config, "inline");

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
@ -76,7 +76,7 @@ public final class SetProcessor extends AbstractProcessor {
return TYPE;
}
public static final class Factory extends AbstractProcessorFactory<SetProcessor> {
public static final class Factory extends AbstractProcessorFactory {
private final TemplateService templateService;
@ -85,7 +85,8 @@ public final class SetProcessor extends AbstractProcessor {
}
@Override
public SetProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public SetProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true);

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.Collections;
import java.util.List;
@ -112,10 +112,11 @@ public final class SortProcessor extends AbstractProcessor {
return TYPE;
}
public final static class Factory extends AbstractProcessorFactory<SortProcessor> {
public final static class Factory extends AbstractProcessorFactory {
@Override
public SortProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public SortProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, FIELD);
try {
SortOrder direction = SortOrder.fromString(

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.Collections;
@ -73,9 +73,10 @@ public final class SplitProcessor extends AbstractProcessor {
return TYPE;
}
public static class Factory extends AbstractProcessorFactory<SplitProcessor> {
public static class Factory extends AbstractProcessorFactory {
@Override
public SplitProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public SplitProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator"));
}

View File

@ -41,7 +41,7 @@ public final class TrimProcessor extends AbstractStringProcessor {
return TYPE;
}
public static final class Factory extends AbstractStringProcessor.Factory<TrimProcessor> {
public static final class Factory extends AbstractStringProcessor.Factory {
public Factory() {
super(TYPE);

View File

@ -43,7 +43,7 @@ public final class UppercaseProcessor extends AbstractStringProcessor {
return TYPE;
}
public static final class Factory extends AbstractStringProcessor.Factory<UppercaseProcessor> {
public static final class Factory extends AbstractStringProcessor.Factory {
public Factory() {
super(TYPE);

View File

@ -87,12 +87,12 @@ public class DateIndexNameFactoryTests extends ESTestCase {
DateIndexNameProcessor.Factory factory = new DateIndexNameProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("date_rounding", "y");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
config.clear();
config.put("field", "_field");
e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
assertThat(e.getMessage(), Matchers.equalTo("[date_rounding] required property is missing"));
}

View File

@ -45,7 +45,7 @@ public class FailProcessorFactoryTests extends ESTestCase {
config.put("message", "error");
String processorTag = randomAsciiOfLength(10);
config.put(AbstractProcessorFactory.TAG_KEY, processorTag);
FailProcessor failProcessor = factory.create(null, config);
FailProcessor failProcessor = (FailProcessor)factory.create(null, config);
assertThat(failProcessor.getTag(), equalTo(processorTag));
assertThat(failProcessor.getMessage().execute(Collections.emptyMap()), equalTo("error"));
}

View File

@ -21,7 +21,7 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
@ -37,9 +37,8 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> {});
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put("_name", (r, c) -> processor);
ProcessorsRegistry registry = new ProcessorsRegistry(processors);
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();

View File

@ -50,7 +50,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("patterns", Collections.singletonList("(?<foo>\\w+)"));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
@ -58,7 +58,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
assertThat(e.getMessage(), equalTo("[patterns] required property is missing"));
}
@ -67,7 +67,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
config.put("patterns", Collections.emptyList());
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
assertThat(e.getMessage(), equalTo("[patterns] List of patterns must not be empty"));
}
@ -89,7 +89,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("patterns", Collections.singletonList("["));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
assertThat(e.getMessage(), equalTo("[patterns] Invalid regex pattern found in: [[]. premature end of char-class"));
}
@ -99,7 +99,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!"));
config.put("pattern_definitions", Collections.singletonMap("MY_PATTERN", "["));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config));
assertThat(e.getMessage(),
equalTo("[patterns] Invalid regex pattern found in: [%{MY_PATTERN:name}!]. premature end of char-class"));
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -55,7 +56,7 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
configMap.put("lang", "mockscript");
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> factory.doCreate(randomAsciiOfLength(10), configMap));
() -> factory.doCreate(null, randomAsciiOfLength(10), configMap));
assertThat(exception.getMessage(), is("[null] Only one of [file], [id], or [inline] may be configured"));
}
@ -66,7 +67,7 @@ public class ScriptProcessorFactoryTests extends ESTestCase {
configMap.put("lang", "mockscript");
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> factory.doCreate(randomAsciiOfLength(10), configMap));
() -> factory.doCreate(null, randomAsciiOfLength(10), configMap));
assertThat(exception.getMessage(), is("[null] Need [file], [id], or [inline] parameter to refer to scripts"));
}

View File

@ -37,7 +37,7 @@ public class SplitProcessorFactoryTests extends ESTestCase {
config.put("separator", "\\.");
String processorTag = randomAsciiOfLength(10);
config.put(AbstractProcessorFactory.TAG_KEY, processorTag);
SplitProcessor splitProcessor = factory.create(null, config);
SplitProcessor splitProcessor = (SplitProcessor)factory.create(null, config);
assertThat(splitProcessor.getTag(), equalTo(processorTag));
assertThat(splitProcessor.getField(), equalTo("field1"));
assertThat(splitProcessor.getSeparator(), equalTo("\\."));

View File

@ -84,7 +84,7 @@ public class SplitProcessorTests extends ESTestCase {
Map<String, Object> splitConfig = new HashMap<>();
splitConfig.put("field", "flags");
splitConfig.put("separator", "\\|");
Processor splitProcessor = (new SplitProcessor.Factory()).create(splitConfig);
Processor splitProcessor = (new SplitProcessor.Factory()).create(null, splitConfig);
Map<String, Object> source = new HashMap<>();
source.put("flags", "new|hot|super|fun|interesting");
IngestDocument ingestDocument = new IngestDocument(source, new HashMap<>());

View File

@ -27,7 +27,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.Processor;
import java.io.IOException;
import java.util.Arrays;
@ -151,12 +151,13 @@ public final class AttachmentProcessor extends AbstractProcessor {
return indexedChars;
}
public static final class Factory extends AbstractProcessorFactory<AttachmentProcessor> {
public static final class Factory extends AbstractProcessorFactory {
static final Set<Property> DEFAULT_PROPERTIES = EnumSet.allOf(Property.class);
@Override
public AttachmentProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public AttachmentProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "attachment");
List<String> properyNames = readOptionalList(TYPE, processorTag, config, "properties");

View File

@ -19,15 +19,21 @@
package org.elasticsearch.ingest.attachment;
import org.elasticsearch.node.NodeModule;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
public class IngestAttachmentPlugin extends Plugin implements IngestPlugin {
public class IngestAttachmentPlugin extends Plugin {
public void onModule(NodeModule nodeModule) throws IOException {
nodeModule.registerProcessor(AttachmentProcessor.TYPE,
(registry) -> new AttachmentProcessor.Factory());
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
return Collections.singletonMap(AttachmentProcessor.TYPE, new AttachmentProcessor.Factory());
}
}

View File

@ -19,26 +19,6 @@
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.City;
import com.maxmind.geoip2.record.Continent;
import com.maxmind.geoip2.record.Country;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Subdivision;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ProcessorsRegistry;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
@ -52,6 +32,24 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.City;
import com.maxmind.geoip2.record.Continent;
import com.maxmind.geoip2.record.Country;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Subdivision;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
@ -218,7 +216,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
return geoData;
}
public static final class Factory extends AbstractProcessorFactory<GeoIpProcessor> {
public static final class Factory extends AbstractProcessorFactory {
static final Set<Property> DEFAULT_CITY_PROPERTIES = EnumSet.of(
Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE, Property.REGION_NAME,
Property.CITY_NAME, Property.LOCATION
@ -232,7 +230,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
}
@Override
public GeoIpProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public GeoIpProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb.gz");
@ -240,7 +239,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
DatabaseReader databaseReader = databaseReaders.get(databaseFile);
if (databaseReader == null) {
throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist");
throw newConfigurationException(TYPE, processorTag,
"database_file", "database file [" + databaseFile + "] doesn't exist");
}
String databaseType = databaseReader.getMetadata().getDatabaseType();

View File

@ -49,7 +49,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
@Override
public synchronized Map<String, Processor.Factory> getProcessors(
Environment env, ClusterService clusterService, ScriptService scriptService, TemplateService templateService) {
Environment env, ScriptService scriptService, TemplateService templateService) {
if (databaseReaders != null) {
throw new IllegalStateException("called onModule twice for geoip plugin!!");
}

View File

@ -78,7 +78,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
String processorTag = randomAsciiOfLength(10);
config.put(AbstractProcessorFactory.TAG_KEY, processorTag);
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
@ -96,7 +96,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
String processorTag = randomAsciiOfLength(10);
config.put(AbstractProcessorFactory.TAG_KEY, processorTag);
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
@ -109,7 +109,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("target_field", "_field");
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_field"));
}
@ -119,7 +119,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
@ -171,7 +171,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("properties", fieldNames);
GeoIpProcessor processor = factory.create(null, config);
GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getProperties(), equalTo(properties));
}

View File

@ -19,22 +19,27 @@
package org.elasticsearch.ingest;
import org.elasticsearch.node.NodeModule;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
/**
* Adds an ingest processor to be used in tests.
*/
public class IngestTestPlugin extends Plugin {
public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor("test", (registry) -> config ->
public class IngestTestPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
return Collections.singletonMap("test", (factories, config) ->
new TestProcessor("id", "test", doc -> {
doc.setFieldValue("processed", true);
if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");
}
})
);
}));
}
}

View File

@ -64,9 +64,9 @@ public class TestProcessor implements Processor {
return invokedCounter.get();
}
public static final class Factory extends AbstractProcessorFactory<TestProcessor> {
public static final class Factory extends AbstractProcessorFactory {
@Override
public TestProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map<String, Object> config) throws Exception {
public TestProcessor doCreate(Map<String, Processor.Factory> registry, String processorTag, Map<String, Object> config) throws Exception {
return new TestProcessor(processorTag, "test-processor", ingestDocument -> {});
}
}