From 21cc0b231669b973956c7fd19fe269061e2cf657 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 14 Jan 2016 16:30:44 +0100 Subject: [PATCH] Cleanup ingest initialization code. * Folded IngestModule into NodeModule * Renamed IngestBootstrapper to IngestService * Let NodeService construct IngestService and removed the Guice annotations * Let IngestService implement Closable --- .../elasticsearch/action/ActionModule.java | 4 +- .../ingest/DeletePipelineTransportAction.java | 6 +- .../ingest/GetPipelineTransportAction.java | 6 +- .../action/ingest/IngestActionFilter.java | 6 +- .../ingest/IngestProxyActionFilter.java | 6 +- .../ingest/PutPipelineTransportAction.java | 8 +- .../SimulatePipelineTransportAction.java | 6 +- .../elasticsearch/ingest/IngestModule.java | 92 ------------------- ...stBootstrapper.java => IngestService.java} | 47 ++-------- .../java/org/elasticsearch/node/Node.java | 9 +- .../org/elasticsearch/node/NodeModule.java | 56 +++++++++++ .../node/service/NodeService.java | 22 ++++- .../elasticsearch/threadpool/ThreadPool.java | 1 - .../ingest/IngestActionFilterTests.java | 15 ++- .../ingest/IngestProxyActionFilterTests.java | 4 +- .../elasticsearch/ingest/IngestClientIT.java | 3 +- .../NodeModuleTests.java} | 36 ++++---- .../ingest/grok/IngestGrokPlugin.java | 6 +- .../ingest/geoip/IngestGeoIpPlugin.java | 6 +- 19 files changed, 145 insertions(+), 194 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/ingest/IngestModule.java rename core/src/main/java/org/elasticsearch/ingest/{IngestBootstrapper.java => IngestService.java} (52%) rename core/src/test/java/org/elasticsearch/{ingest/IngestModuleTests.java => node/NodeModuleTests.java} (56%) diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index c784eba55dc..b6acfe8dd84 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -197,7 +197,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.IngestModule; +import org.elasticsearch.node.NodeModule; import java.util.ArrayList; import java.util.HashMap; @@ -228,7 +228,7 @@ public class ActionModule extends AbstractModule { private final boolean proxy; public ActionModule(Settings settings, boolean proxy) { - this.ingestEnabled = IngestModule.isIngestEnabled(settings); + this.ingestEnabled = NodeModule.isNodeIngestEnabled(settings); this.proxy = proxy; } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java index 03f63ff26b7..4f270572df4 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java @@ -29,8 +29,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.IngestBootstrapper; import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -41,9 +41,9 @@ public class DeletePipelineTransportAction extends TransportMasterNodeAction ingestNodes = new ArrayList<>(); for (DiscoveryNode node : clusterService.state().nodes()) { - if (IngestModule.isIngestEnabled(node.getAttributes())) { + if (NodeModule.isNodeIngestEnabled(node.getAttributes())) { ingestNodes.add(node); } } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 3b9e738f69b..123a5c59038 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -20,9 +20,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -31,8 +29,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.IngestBootstrapper; import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -43,9 +41,9 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction new DateProcessor.Factory()); - registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); - registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); - registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); - registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); - registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); - registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); - registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); - registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory()); - registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); - registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); - registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); - registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); - } - - @Override - protected void configure() { - bind(ProcessorsRegistry.class).toInstance(processorsRegistry); - binder().bind(IngestBootstrapper.class).asEagerSingleton(); - } - - /** - * Adds a processor factory under a specific type name. - */ - public void registerProcessor(String type, BiFunction> processorFactoryProvider) { - processorsRegistry.registerProcessor(type, processorFactoryProvider); - } - - public static boolean isIngestEnabled(Settings settings) { - return settings.getAsBoolean("node.ingest", true); - } - - public static boolean isIngestEnabled(ImmutableOpenMap nodeAttributes) { - String ingestEnabled = nodeAttributes.get("ingest"); - //reproduces same logic used in settings.getAsBoolean used above - return Booleans.parseBoolean(ingestEnabled, true); - } -} diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java similarity index 52% rename from core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java rename to core/src/main/java/org/elasticsearch/ingest/IngestService.java index cf145821859..da26903a056 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,50 +19,28 @@ package org.elasticsearch.ingest; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.Environment; -import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; /** * Instantiates and wires all the services that the ingest plugin will be needing. * Also the bootstrapper is in charge of starting and stopping the ingest plugin based on the cluster state. */ -public class IngestBootstrapper extends AbstractLifecycleComponent { +public class IngestService implements Closeable { private final Environment environment; private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; private final ProcessorsRegistry processorsRegistry; - // TODO(simonw): I would like to stress this abstraction a little more and move it's construction into - // NodeService and instead of making it AbstractLifecycleComponent just impl Closeable. - // that way we can start the effort of making NodeModule the central point of required service and also move the registration of the - // pipelines into NodeModule? I'd really like to prevent adding yet another module. - @Inject - public IngestBootstrapper(Settings settings, ThreadPool threadPool, Environment environment, - ClusterService clusterService, ProcessorsRegistry processorsRegistry) { - super(settings); + public IngestService(Settings settings, ThreadPool threadPool, Environment environment, + ClusterService clusterService, ProcessorsRegistry processorsRegistry) { this.environment = environment; this.processorsRegistry = processorsRegistry; this.pipelineStore = new PipelineStore(settings, clusterService); @@ -77,26 +55,13 @@ public class IngestBootstrapper extends AbstractLifecycleComponent { return pipelineExecutionService; } - @Inject public void setScriptService(ScriptService scriptService) { pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService); } @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - try { - pipelineStore.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } + public void close() throws IOException { + pipelineStore.close(); } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 2c4bcbc84a8..ce9a3742876 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -71,10 +71,10 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; -import org.elasticsearch.ingest.IngestModule; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.percolator.PercolatorModule; import org.elasticsearch.percolator.PercolatorService; import org.elasticsearch.plugins.Plugin; @@ -196,7 +196,6 @@ public class Node implements Releasable { modules.add(new RepositoriesModule()); modules.add(new TribeModule()); modules.add(new AnalysisModule(environment)); - modules.add(new IngestModule()); pluginsService.processModules(modules); @@ -346,6 +345,12 @@ public class Node implements Releasable { StopWatch stopWatch = new StopWatch("node_close"); stopWatch.start("tribe"); injector.getInstance(TribeService.class).close(); + stopWatch.stop().start("ingest_service"); + try { + injector.getInstance(NodeService.class).getIngestService().close(); + } catch (IOException e) { + logger.warn("IngestService close failed", e); + } stopWatch.stop().start("http"); if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).close(); diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index aa52d389340..c1b707bfd0f 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -20,11 +20,33 @@ package org.elasticsearch.node; import org.elasticsearch.cache.recycler.PageCacheRecycler; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.ingest.core.TemplateService; +import org.elasticsearch.ingest.processor.AppendProcessor; +import org.elasticsearch.ingest.processor.ConvertProcessor; +import org.elasticsearch.ingest.processor.DateProcessor; +import org.elasticsearch.ingest.processor.FailProcessor; +import org.elasticsearch.ingest.processor.GsubProcessor; +import org.elasticsearch.ingest.processor.JoinProcessor; +import org.elasticsearch.ingest.processor.LowercaseProcessor; +import org.elasticsearch.ingest.processor.RemoveProcessor; +import org.elasticsearch.ingest.processor.RenameProcessor; +import org.elasticsearch.ingest.processor.SetProcessor; +import org.elasticsearch.ingest.processor.SplitProcessor; +import org.elasticsearch.ingest.processor.TrimProcessor; +import org.elasticsearch.ingest.processor.UppercaseProcessor; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.service.NodeService; +import java.util.function.BiFunction; + /** * */ @@ -32,6 +54,7 @@ public class NodeModule extends AbstractModule { private final Node node; private final MonitorService monitorService; + private final ProcessorsRegistry processorsRegistry; // pkg private so tests can mock Class pageCacheRecyclerImpl = PageCacheRecycler.class; @@ -40,6 +63,21 @@ public class NodeModule extends AbstractModule { public NodeModule(Node node, MonitorService monitorService) { this.node = node; this.monitorService = monitorService; + this.processorsRegistry = new ProcessorsRegistry(); + + registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); + registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); + registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); + registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); + registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); + registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); + registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); + registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); + registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory()); + registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); + registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); + registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); + registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); } @Override @@ -58,5 +96,23 @@ public class NodeModule extends AbstractModule { bind(Node.class).toInstance(node); bind(MonitorService.class).toInstance(monitorService); bind(NodeService.class).asEagerSingleton(); + bind(ProcessorsRegistry.class).toInstance(processorsRegistry); + } + + /** + * Adds a processor factory under a specific type name. + */ + public void registerProcessor(String type, BiFunction> processorFactoryProvider) { + processorsRegistry.registerProcessor(type, processorFactoryProvider); + } + + public static boolean isNodeIngestEnabled(Settings settings) { + return settings.getAsBoolean("node.ingest", true); + } + + public static boolean isNodeIngestEnabled(ImmutableOpenMap nodeAttributes) { + String ingestEnabled = nodeAttributes.get("ingest"); + //reproduces same logic used in settings.getAsBoolean used above + return Booleans.parseBoolean(ingestEnabled, true); } } diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/service/NodeService.java index b4fe59e3473..ecf5cd07e11 100644 --- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -24,20 +24,27 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.env.Environment; import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.PipelineExecutionService; +import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -55,6 +62,7 @@ public class NodeService extends AbstractComponent { private final IndicesService indicesService; private final PluginsService pluginService; private final CircuitBreakerService circuitBreakerService; + private final IngestService ingestService; private ScriptService scriptService; @Nullable @@ -67,10 +75,10 @@ public class NodeService extends AbstractComponent { private final Discovery discovery; @Inject - public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, - TransportService transportService, IndicesService indicesService, - PluginsService pluginService, CircuitBreakerService circuitBreakerService, - Version version) { + public NodeService(Settings settings, Environment environment, ThreadPool threadPool, MonitorService monitorService, + Discovery discovery, TransportService transportService, IndicesService indicesService, + PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version, + ProcessorsRegistry processorsRegistry, ClusterService clusterService) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -81,12 +89,14 @@ public class NodeService extends AbstractComponent { this.version = version; this.pluginService = pluginService; this.circuitBreakerService = circuitBreakerService; + this.ingestService = new IngestService(settings, threadPool, environment, clusterService, processorsRegistry); } // can not use constructor injection or there will be a circular dependency @Inject(optional = true) public void setScriptService(ScriptService scriptService) { this.scriptService = scriptService; + this.ingestService.setScriptService(scriptService); } public void setHttpServer(@Nullable HttpServer httpServer) { @@ -176,4 +186,8 @@ public class NodeService extends AbstractComponent { discoveryStats ? discovery.stats() : null ); } + + public IngestService getIngestService() { + return ingestService; + } } diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 6eca00f2aab..0e6204ddd10 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.ingest.IngestModule; import java.io.IOException; import java.util.ArrayList; diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index 91c6765520c..8f613aeb258 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -29,13 +29,14 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.IngestBootstrapper; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.core.CompoundProcessor; import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -63,8 +64,10 @@ public class IngestActionFilterTests extends ESTestCase { @Before public void setup() { executionService = mock(PipelineExecutionService.class); - IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class); - when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); + IngestService ingestService = mock(IngestService.class); + when(ingestService.getPipelineExecutionService()).thenReturn(executionService); + NodeService bootstrapper = mock(NodeService.class); + when(bootstrapper.getIngestService()).thenReturn(ingestService); filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); } @@ -170,8 +173,10 @@ public class IngestActionFilterTests extends ESTestCase { }; when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); executionService = new PipelineExecutionService(store, threadPool); - IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class); - when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); + IngestService ingestService = mock(IngestService.class); + when(ingestService.getPipelineExecutionService()).thenReturn(executionService); + NodeService bootstrapper = mock(NodeService.class); + when(bootstrapper.getIngestService()).thenReturn(ingestService); filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); BulkRequest bulkRequest = new BulkRequest(); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java index ed1bc7ee0ce..042dacce223 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java @@ -33,7 +33,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.transport.DummyTransportAddress; -import org.elasticsearch.ingest.IngestModule; +import org.elasticsearch.node.NodeModule; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -245,7 +245,7 @@ public class IngestProxyActionFilterTests extends ESTestCase { @Override protected boolean matchesSafely(DiscoveryNode node) { - return IngestModule.isIngestEnabled(node.getAttributes()); + return NodeModule.isNodeIngestEnabled(node.getAttributes()); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 9742dd1b978..251a070409d 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.node.NodeModule; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -224,7 +225,7 @@ public class IngestClientIT extends ESIntegTestCase { return "ingest mock"; } - public void onModule(IngestModule ingestModule) { + public void onModule(NodeModule ingestModule) { ingestModule.registerProcessor("test", (environment, templateService) -> config -> new TestProcessor("test", ingestDocument -> { ingestDocument.setFieldValue("processed", true); diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestModuleTests.java b/core/src/test/java/org/elasticsearch/node/NodeModuleTests.java similarity index 56% rename from core/src/test/java/org/elasticsearch/ingest/IngestModuleTests.java rename to core/src/test/java/org/elasticsearch/node/NodeModuleTests.java index c5abb491161..ad8005d2901 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestModuleTests.java +++ b/core/src/test/java/org/elasticsearch/node/NodeModuleTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.ingest; +package org.elasticsearch.node; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -25,42 +25,42 @@ import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.equalTo; -public class IngestModuleTests extends ESTestCase { +public class NodeModuleTests extends ESTestCase { - public void testIsIngestEnabledSettings() { - assertThat(IngestModule.isIngestEnabled(Settings.EMPTY), equalTo(true)); - assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", true).build()), equalTo(true)); - assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "true").build()), equalTo(true)); - assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", false).build()), equalTo(false)); + public void testIsNodeIngestEnabledSettings() { + assertThat(NodeModule.isNodeIngestEnabled(Settings.EMPTY), equalTo(true)); + assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", true).build()), equalTo(true)); + assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "true").build()), equalTo(true)); + assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", false).build()), equalTo(false)); - assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "false").build()), equalTo(false)); - assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "off").build()), equalTo(false)); - assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "no").build()), equalTo(false)); - assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "0").build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "false").build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "off").build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "no").build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "0").build()), equalTo(false)); } public void testIsIngestEnabledAttributes() { - assertThat(IngestModule.isIngestEnabled(ImmutableOpenMap.builder().build()), equalTo(true)); + assertThat(NodeModule.isNodeIngestEnabled(ImmutableOpenMap.builder().build()), equalTo(true)); ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.put("ingest", "true"); - assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(true)); + assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(true)); builder = ImmutableOpenMap.builder(); builder.put("ingest", "false"); - assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false)); builder = ImmutableOpenMap.builder(); builder.put("ingest", "off"); - assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false)); builder = ImmutableOpenMap.builder(); builder.put("ingest", "no"); - assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false)); builder = ImmutableOpenMap.builder(); builder.put("ingest", "0"); - assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); + assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false)); } public void testIsIngestEnabledMethodsReturnTheSameValue() { @@ -75,6 +75,6 @@ public class IngestModuleTests extends ESTestCase { builder.put("ingest", randomString); ImmutableOpenMap attributes = builder.build(); - assertThat(IngestModule.isIngestEnabled(settings), equalTo(IngestModule.isIngestEnabled(attributes))); + assertThat(NodeModule.isNodeIngestEnabled(settings), equalTo(NodeModule.isNodeIngestEnabled(attributes))); } } diff --git a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java index f6423fe40cf..9ca5bc24c9a 100644 --- a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java +++ b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.grok; -import org.elasticsearch.ingest.IngestModule; +import org.elasticsearch.node.NodeModule; import org.elasticsearch.plugins.Plugin; import java.io.BufferedReader; @@ -55,8 +55,8 @@ public class IngestGrokPlugin extends Plugin { return "Ingest processor that uses grok patterns to split text"; } - public void onModule(IngestModule ingestModule) { - ingestModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns)); + public void onModule(NodeModule nodeModule) { + nodeModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns)); } static Map loadBuiltinPatterns() throws IOException { diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 81cecb76f5d..4b6a60902ea 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.geoip; -import org.elasticsearch.ingest.IngestModule; +import org.elasticsearch.node.NodeModule; import org.elasticsearch.plugins.Plugin; public class IngestGeoIpPlugin extends Plugin { @@ -34,7 +34,7 @@ public class IngestGeoIpPlugin extends Plugin { return "Plugin that allows to plug in ingest processors"; } - public void onModule(IngestModule ingestModule) { - ingestModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); + public void onModule(NodeModule nodeModule) { + nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); } }