Provide access to ThreadContext in ingest plugins

Also introduced a `Processor.Parameters` class that is holder for several services processors rely on,
the  IngestPlugin#getProcessors(...) method has been changed to accept `Processor.Parameters` instead
of each service seperately.
This commit is contained in:
Martijn van Groningen 2016-07-14 11:12:33 +02:00
parent 9b6e2a8e2f
commit d0069f0fbb
9 changed files with 65 additions and 48 deletions

View File

@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
@ -43,9 +42,11 @@ public class IngestService {
public IngestService(Settings settings, ThreadPool threadPool,
Environment env, ScriptService scriptService, List<IngestPlugin> ingestPlugins) {
final TemplateService templateService = new InternalTemplateService(scriptService);
Processor.Parameters parameters = new Processor.Parameters(env, scriptService, templateService,
threadPool.getThreadContext());
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (IngestPlugin ingestPlugin : ingestPlugins) {
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(env, scriptService, templateService);
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(parameters);
for (Map.Entry<String, Processor.Factory> entry : newProcessors.entrySet()) {
if (processorFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered");

View File

@ -19,6 +19,10 @@
package org.elasticsearch.ingest;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.script.ScriptService;
import java.util.Map;
/**
@ -60,4 +64,41 @@ public interface Processor {
Processor create(Map<String, Processor.Factory> processorFactories, String tag,
Map<String, Object> config) throws Exception;
}
/**
* Infrastructure class that holds services that can be used by processor factories to create processor instances
* and that gets passed around to all {@link org.elasticsearch.plugins.IngestPlugin}s.
*/
class Parameters {
/**
* Useful to provide access to the node's environment like config directory to processor factories.
*/
public final Environment env;
/**
* Provides processors script support.
*/
public final ScriptService scriptService;
/**
* Provides template support to pipeline settings.
*/
public final TemplateService templateService;
/**
* Allows processors to read headers set by {@link org.elasticsearch.action.support.ActionFilter}
* instances that have run prior to in ingest.
*/
public final ThreadContext threadContext;
public Parameters(Environment env, ScriptService scriptService, TemplateService templateService,
ThreadContext threadContext) {
this.env = env;
this.scriptService = scriptService;
this.templateService = templateService;
this.threadContext = threadContext;
}
}
}

View File

@ -22,11 +22,7 @@ package org.elasticsearch.plugins;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.script.ScriptService;
/**
* An extension point for {@link Plugin} implementations to add custom ingest processors
@ -40,8 +36,7 @@ public interface IngestPlugin {
* in pipeline configurations, and the value is a {@link org.elasticsearch.ingest.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
default Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}
}

View File

@ -24,29 +24,31 @@ import java.util.Collections;
import java.util.Map;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.mockito.Mockito;
public class IngestServiceTests extends ESTestCase {
private final IngestPlugin DUMMY_PLUGIN = new IngestPlugin() {
@Override
public Map<String, Processor.Factory> getProcessors(Environment env, ScriptService scriptService, TemplateService templateService) {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap("foo", (factories, tag, config) -> null);
}
};
public void testIngestPlugin() {
IngestService ingestService = new IngestService(Settings.EMPTY, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
ThreadPool tp = Mockito.mock(ThreadPool.class);
IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, Collections.singletonList(DUMMY_PLUGIN));
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
assertTrue(factories.containsKey("foo"));
assertEquals(1, factories.size());
}
public void testIngestPluginDuplicate() {
ThreadPool tp = Mockito.mock(ThreadPool.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new IngestService(Settings.EMPTY, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
new IngestService(Settings.EMPTY, tp, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
);
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
}

View File

@ -28,12 +28,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
public class IngestCommonPlugin extends Plugin implements IngestPlugin {
@ -44,14 +41,13 @@ public class IngestCommonPlugin extends Plugin implements IngestPlugin {
}
@Override
public Map<String, Processor.Factory> getProcessors(Environment env, ScriptService scriptService,
TemplateService templateService) {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(DateProcessor.TYPE, new DateProcessor.Factory());
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(templateService));
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(templateService));
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.templateService));
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.templateService));
processors.put(RenameProcessor.TYPE, new RenameProcessor.Factory());
processors.put(RemoveProcessor.TYPE, new RemoveProcessor.Factory(templateService));
processors.put(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.templateService));
processors.put(SplitProcessor.TYPE, new SplitProcessor.Factory());
processors.put(JoinProcessor.TYPE, new JoinProcessor.Factory());
processors.put(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory());
@ -59,12 +55,12 @@ public class IngestCommonPlugin extends Plugin implements IngestPlugin {
processors.put(TrimProcessor.TYPE, new TrimProcessor.Factory());
processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory());
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(templateService));
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.templateService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory());
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory());
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(builtinPatterns));
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(scriptService));
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
return Collections.unmodifiableMap(processors);
}

View File

@ -22,18 +22,14 @@ package org.elasticsearch.ingest.attachment;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
public class IngestAttachmentPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(AttachmentProcessor.TYPE, new AttachmentProcessor.Factory());
}
}

View File

@ -35,25 +35,20 @@ import java.util.zip.GZIPInputStream;
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
private Map<String, DatabaseReader> databaseReaders;
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
if (databaseReaders != null) {
throw new IllegalStateException("called onModule twice for geoip plugin!!");
}
Path geoIpConfigDirectory = env.configFile().resolve("ingest-geoip");
Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip");
try {
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
} catch (IOException e) {

View File

@ -20,14 +20,9 @@
package org.elasticsearch.ingest.useragent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.io.InputStream;
@ -48,16 +43,15 @@ public class IngestUserAgentPlugin extends Plugin implements IngestPlugin {
static final String DEFAULT_PARSER_NAME = "_default_";
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
Path userAgentConfigDirectory = env.configFile().resolve("ingest-user-agent");
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Path userAgentConfigDirectory = parameters.env.configFile().resolve("ingest-user-agent");
if (Files.exists(userAgentConfigDirectory) == false && Files.isDirectory(userAgentConfigDirectory)) {
throw new IllegalStateException(
"the user agent directory [" + userAgentConfigDirectory + "] containing the regex file doesn't exist");
}
long cacheSize = CACHE_SIZE_SETTING.get(env.settings());
long cacheSize = CACHE_SIZE_SETTING.get(parameters.env.settings());
Map<String, UserAgentParser> userAgentParsers;
try {
userAgentParsers = createUserAgentParsers(userAgentConfigDirectory, new UserAgentCache(cacheSize));

View File

@ -22,18 +22,15 @@ package org.elasticsearch.ingest;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
/**
* Adds an ingest processor to be used in tests.
*/
public class IngestTestPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(
Environment env, ScriptService scriptService, TemplateService templateService) {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap("test", (factories, tag, config) ->
new TestProcessor("id", "test", doc -> {
doc.setFieldValue("processed", true);