Make it possible for Ingest Processors to access AnalysisRegistry
The analysis registry will be used in PMML plugin ingest processor.
This commit is contained in:
parent
b8f4c92d41
commit
a68083f5cb
|
@ -27,6 +27,7 @@ import java.util.Map;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||||
import org.elasticsearch.plugins.IngestPlugin;
|
import org.elasticsearch.plugins.IngestPlugin;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -40,10 +41,12 @@ public class IngestService {
|
||||||
private final PipelineExecutionService pipelineExecutionService;
|
private final PipelineExecutionService pipelineExecutionService;
|
||||||
|
|
||||||
public IngestService(Settings settings, ThreadPool threadPool,
|
public IngestService(Settings settings, ThreadPool threadPool,
|
||||||
Environment env, ScriptService scriptService, List<IngestPlugin> ingestPlugins) {
|
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
|
||||||
|
List<IngestPlugin> ingestPlugins) {
|
||||||
|
|
||||||
final TemplateService templateService = new InternalTemplateService(scriptService);
|
final TemplateService templateService = new InternalTemplateService(scriptService);
|
||||||
Processor.Parameters parameters = new Processor.Parameters(env, scriptService, templateService,
|
Processor.Parameters parameters = new Processor.Parameters(env, scriptService, templateService,
|
||||||
threadPool.getThreadContext());
|
analysisRegistry, threadPool.getThreadContext());
|
||||||
Map<String, Processor.Factory> processorFactories = new HashMap<>();
|
Map<String, Processor.Factory> processorFactories = new HashMap<>();
|
||||||
for (IngestPlugin ingestPlugin : ingestPlugins) {
|
for (IngestPlugin ingestPlugin : ingestPlugins) {
|
||||||
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(parameters);
|
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(parameters);
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -86,6 +87,11 @@ public interface Processor {
|
||||||
*/
|
*/
|
||||||
public final TemplateService templateService;
|
public final TemplateService templateService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provide analyzer support
|
||||||
|
*/
|
||||||
|
public final AnalysisRegistry analysisRegistry;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allows processors to read headers set by {@link org.elasticsearch.action.support.ActionFilter}
|
* Allows processors to read headers set by {@link org.elasticsearch.action.support.ActionFilter}
|
||||||
* instances that have run prior to in ingest.
|
* instances that have run prior to in ingest.
|
||||||
|
@ -93,11 +99,12 @@ public interface Processor {
|
||||||
public final ThreadContext threadContext;
|
public final ThreadContext threadContext;
|
||||||
|
|
||||||
public Parameters(Environment env, ScriptService scriptService, TemplateService templateService,
|
public Parameters(Environment env, ScriptService scriptService, TemplateService templateService,
|
||||||
ThreadContext threadContext) {
|
AnalysisRegistry analysisRegistry, ThreadContext threadContext) {
|
||||||
this.env = env;
|
this.env = env;
|
||||||
this.scriptService = scriptService;
|
this.scriptService = scriptService;
|
||||||
this.templateService = templateService;
|
this.templateService = templateService;
|
||||||
this.threadContext = threadContext;
|
this.threadContext = threadContext;
|
||||||
|
this.analysisRegistry = analysisRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -311,7 +311,7 @@ public class Node implements Closeable {
|
||||||
final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId());
|
final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId());
|
||||||
resourcesToClose.add(tribeService);
|
resourcesToClose.add(tribeService);
|
||||||
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
|
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
|
||||||
scriptModule.getScriptService(), pluginsService.filterPlugins(IngestPlugin.class));
|
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
|
||||||
|
|
||||||
ModulesBuilder modules = new ModulesBuilder();
|
ModulesBuilder modules = new ModulesBuilder();
|
||||||
// plugin modules must be added here, before others or we can get crazy injection errors...
|
// plugin modules must be added here, before others or we can get crazy injection errors...
|
||||||
|
|
|
@ -39,7 +39,7 @@ public class IngestServiceTests extends ESTestCase {
|
||||||
|
|
||||||
public void testIngestPlugin() {
|
public void testIngestPlugin() {
|
||||||
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
||||||
IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, Collections.singletonList(DUMMY_PLUGIN));
|
IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
|
||||||
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
|
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
|
||||||
assertTrue(factories.containsKey("foo"));
|
assertTrue(factories.containsKey("foo"));
|
||||||
assertEquals(1, factories.size());
|
assertEquals(1, factories.size());
|
||||||
|
@ -48,7 +48,7 @@ public class IngestServiceTests extends ESTestCase {
|
||||||
public void testIngestPluginDuplicate() {
|
public void testIngestPluginDuplicate() {
|
||||||
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
||||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||||
new IngestService(Settings.EMPTY, tp, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
|
new IngestService(Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
|
||||||
);
|
);
|
||||||
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
|
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue