Cleanup ingest initialization code.

* Folded IngestModule into NodeModule
* Renamed IngestBootstrapper to IngestService
* Let NodeService construct IngestService and removed the Guice annotations
* Let IngestService implement Closable
This commit is contained in:
Martijn van Groningen 2016-01-14 16:30:44 +01:00
parent 9c06736dbd
commit 21cc0b2316
19 changed files with 145 additions and 194 deletions

View File

@ -197,7 +197,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder; import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestModule; import org.elasticsearch.node.NodeModule;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -228,7 +228,7 @@ public class ActionModule extends AbstractModule {
private final boolean proxy; private final boolean proxy;
public ActionModule(Settings settings, boolean proxy) { public ActionModule(Settings settings, boolean proxy) {
this.ingestEnabled = IngestModule.isIngestEnabled(settings); this.ingestEnabled = NodeModule.isNodeIngestEnabled(settings);
this.proxy = proxy; this.proxy = proxy;
} }

View File

@ -29,8 +29,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -41,9 +41,9 @@ public class DeletePipelineTransportAction extends TransportMasterNodeAction<Del
@Inject @Inject
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = nodeService.getIngestService().getPipelineStore();
} }
@Override @Override

View File

@ -29,8 +29,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -41,9 +41,9 @@ public class GetPipelineTransportAction extends TransportMasterNodeReadAction<Ge
@Inject @Inject
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new); super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = nodeService.getIngestService().getPipelineStore();
} }
@Override @Override

View File

@ -34,8 +34,8 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import java.util.ArrayList; import java.util.ArrayList;
@ -49,9 +49,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
private final PipelineExecutionService executionService; private final PipelineExecutionService executionService;
@Inject @Inject
public IngestActionFilter(Settings settings, IngestBootstrapper bootstrapper) { public IngestActionFilter(Settings settings, NodeService nodeService) {
super(settings); super(settings);
this.executionService = bootstrapper.getPipelineExecutionService(); this.executionService = nodeService.getIngestService().getPipelineExecutionService();
} }
@Override @Override

View File

@ -33,7 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.ingest.IngestModule; import org.elasticsearch.node.NodeModule;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportException;
@ -125,10 +125,10 @@ public final class IngestProxyActionFilter implements ActionFilter {
} }
private DiscoveryNode randomIngestNode() { private DiscoveryNode randomIngestNode() {
assert IngestModule.isIngestEnabled(clusterService.localNode().attributes()) == false; assert NodeModule.isNodeIngestEnabled(clusterService.localNode().attributes()) == false;
List<DiscoveryNode> ingestNodes = new ArrayList<>(); List<DiscoveryNode> ingestNodes = new ArrayList<>();
for (DiscoveryNode node : clusterService.state().nodes()) { for (DiscoveryNode node : clusterService.state().nodes()) {
if (IngestModule.isIngestEnabled(node.getAttributes())) { if (NodeModule.isNodeIngestEnabled(node.getAttributes())) {
ingestNodes.add(node); ingestNodes.add(node);
} }
} }

View File

@ -20,9 +20,7 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -31,8 +29,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -43,9 +41,9 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
@Inject @Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = nodeService.getIngestService().getPipelineStore();
} }
@Override @Override

View File

@ -26,8 +26,8 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -39,9 +39,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
private final SimulateExecutionService executionService; private final SimulateExecutionService executionService;
@Inject @Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IngestBootstrapper bootstrapper) { public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new); super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
this.pipelineStore = bootstrapper.getPipelineStore(); this.pipelineStore = nodeService.getIngestService().getPipelineStore();
this.executionService = new SimulateExecutionService(threadPool); this.executionService = new SimulateExecutionService(threadPool);
} }

View File

@ -1,92 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
import java.util.function.BiFunction;
/**
* Registry for processor factories
* @see Processor.Factory
*/
public class IngestModule extends AbstractModule {
private final ProcessorsRegistry processorsRegistry;
public IngestModule() {
this.processorsRegistry = new ProcessorsRegistry();
registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
}
@Override
protected void configure() {
bind(ProcessorsRegistry.class).toInstance(processorsRegistry);
binder().bind(IngestBootstrapper.class).asEagerSingleton();
}
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) {
processorsRegistry.registerProcessor(type, processorFactoryProvider);
}
public static boolean isIngestEnabled(Settings settings) {
return settings.getAsBoolean("node.ingest", true);
}
public static boolean isIngestEnabled(ImmutableOpenMap<String, String> nodeAttributes) {
String ingestEnabled = nodeAttributes.get("ingest");
//reproduces same logic used in settings.getAsBoolean used above
return Booleans.parseBoolean(ingestEnabled, true);
}
}

View File

@ -19,50 +19,28 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
/** /**
* Instantiates and wires all the services that the ingest plugin will be needing. * Instantiates and wires all the services that the ingest plugin will be needing.
* Also the bootstrapper is in charge of starting and stopping the ingest plugin based on the cluster state. * Also the bootstrapper is in charge of starting and stopping the ingest plugin based on the cluster state.
*/ */
public class IngestBootstrapper extends AbstractLifecycleComponent { public class IngestService implements Closeable {
private final Environment environment; private final Environment environment;
private final PipelineStore pipelineStore; private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService; private final PipelineExecutionService pipelineExecutionService;
private final ProcessorsRegistry processorsRegistry; private final ProcessorsRegistry processorsRegistry;
// TODO(simonw): I would like to stress this abstraction a little more and move it's construction into public IngestService(Settings settings, ThreadPool threadPool, Environment environment,
// NodeService and instead of making it AbstractLifecycleComponent just impl Closeable. ClusterService clusterService, ProcessorsRegistry processorsRegistry) {
// that way we can start the effort of making NodeModule the central point of required service and also move the registration of the
// pipelines into NodeModule? I'd really like to prevent adding yet another module.
@Inject
public IngestBootstrapper(Settings settings, ThreadPool threadPool, Environment environment,
ClusterService clusterService, ProcessorsRegistry processorsRegistry) {
super(settings);
this.environment = environment; this.environment = environment;
this.processorsRegistry = processorsRegistry; this.processorsRegistry = processorsRegistry;
this.pipelineStore = new PipelineStore(settings, clusterService); this.pipelineStore = new PipelineStore(settings, clusterService);
@ -77,26 +55,13 @@ public class IngestBootstrapper extends AbstractLifecycleComponent {
return pipelineExecutionService; return pipelineExecutionService;
} }
@Inject
public void setScriptService(ScriptService scriptService) { public void setScriptService(ScriptService scriptService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService); pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService);
} }
@Override @Override
protected void doStart() { public void close() throws IOException {
} pipelineStore.close();
@Override
protected void doStop() {
}
@Override
protected void doClose() {
try {
pipelineStore.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -71,10 +71,10 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.ingest.IngestModule;
import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.percolator.PercolatorModule; import org.elasticsearch.percolator.PercolatorModule;
import org.elasticsearch.percolator.PercolatorService; import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
@ -196,7 +196,6 @@ public class Node implements Releasable {
modules.add(new RepositoriesModule()); modules.add(new RepositoriesModule());
modules.add(new TribeModule()); modules.add(new TribeModule());
modules.add(new AnalysisModule(environment)); modules.add(new AnalysisModule(environment));
modules.add(new IngestModule());
pluginsService.processModules(modules); pluginsService.processModules(modules);
@ -346,6 +345,12 @@ public class Node implements Releasable {
StopWatch stopWatch = new StopWatch("node_close"); StopWatch stopWatch = new StopWatch("node_close");
stopWatch.start("tribe"); stopWatch.start("tribe");
injector.getInstance(TribeService.class).close(); injector.getInstance(TribeService.class).close();
stopWatch.stop().start("ingest_service");
try {
injector.getInstance(NodeService.class).getIngestService().close();
} catch (IOException e) {
logger.warn("IngestService close failed", e);
}
stopWatch.stop().start("http"); stopWatch.stop().start("http");
if (settings.getAsBoolean("http.enabled", true)) { if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close(); injector.getInstance(HttpServer.class).close();

View File

@ -20,11 +20,33 @@
package org.elasticsearch.node; package org.elasticsearch.node;
import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.service.NodeService;
import java.util.function.BiFunction;
/** /**
* *
*/ */
@ -32,6 +54,7 @@ public class NodeModule extends AbstractModule {
private final Node node; private final Node node;
private final MonitorService monitorService; private final MonitorService monitorService;
private final ProcessorsRegistry processorsRegistry;
// pkg private so tests can mock // pkg private so tests can mock
Class<? extends PageCacheRecycler> pageCacheRecyclerImpl = PageCacheRecycler.class; Class<? extends PageCacheRecycler> pageCacheRecyclerImpl = PageCacheRecycler.class;
@ -40,6 +63,21 @@ public class NodeModule extends AbstractModule {
public NodeModule(Node node, MonitorService monitorService) { public NodeModule(Node node, MonitorService monitorService) {
this.node = node; this.node = node;
this.monitorService = monitorService; this.monitorService = monitorService;
this.processorsRegistry = new ProcessorsRegistry();
registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
} }
@Override @Override
@ -58,5 +96,23 @@ public class NodeModule extends AbstractModule {
bind(Node.class).toInstance(node); bind(Node.class).toInstance(node);
bind(MonitorService.class).toInstance(monitorService); bind(MonitorService.class).toInstance(monitorService);
bind(NodeService.class).asEagerSingleton(); bind(NodeService.class).asEagerSingleton();
bind(ProcessorsRegistry.class).toInstance(processorsRegistry);
}
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) {
processorsRegistry.registerProcessor(type, processorFactoryProvider);
}
public static boolean isNodeIngestEnabled(Settings settings) {
return settings.getAsBoolean("node.ingest", true);
}
public static boolean isNodeIngestEnabled(ImmutableOpenMap<String, String> nodeAttributes) {
String ingestEnabled = nodeAttributes.get("ingest");
//reproduces same logic used in settings.getAsBoolean used above
return Booleans.parseBoolean(ingestEnabled, true);
} }
} }

View File

@ -24,20 +24,27 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.http.HttpServer; import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -55,6 +62,7 @@ public class NodeService extends AbstractComponent {
private final IndicesService indicesService; private final IndicesService indicesService;
private final PluginsService pluginService; private final PluginsService pluginService;
private final CircuitBreakerService circuitBreakerService; private final CircuitBreakerService circuitBreakerService;
private final IngestService ingestService;
private ScriptService scriptService; private ScriptService scriptService;
@Nullable @Nullable
@ -67,10 +75,10 @@ public class NodeService extends AbstractComponent {
private final Discovery discovery; private final Discovery discovery;
@Inject @Inject
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, public NodeService(Settings settings, Environment environment, ThreadPool threadPool, MonitorService monitorService,
TransportService transportService, IndicesService indicesService, Discovery discovery, TransportService transportService, IndicesService indicesService,
PluginsService pluginService, CircuitBreakerService circuitBreakerService, PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version,
Version version) { ProcessorsRegistry processorsRegistry, ClusterService clusterService) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.monitorService = monitorService; this.monitorService = monitorService;
@ -81,12 +89,14 @@ public class NodeService extends AbstractComponent {
this.version = version; this.version = version;
this.pluginService = pluginService; this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
this.ingestService = new IngestService(settings, threadPool, environment, clusterService, processorsRegistry);
} }
// can not use constructor injection or there will be a circular dependency // can not use constructor injection or there will be a circular dependency
@Inject(optional = true) @Inject(optional = true)
public void setScriptService(ScriptService scriptService) { public void setScriptService(ScriptService scriptService) {
this.scriptService = scriptService; this.scriptService = scriptService;
this.ingestService.setScriptService(scriptService);
} }
public void setHttpServer(@Nullable HttpServer httpServer) { public void setHttpServer(@Nullable HttpServer httpServer) {
@ -176,4 +186,8 @@ public class NodeService extends AbstractComponent {
discoveryStats ? discovery.stats() : null discoveryStats ? discovery.stats() : null
); );
} }
public IngestService getIngestService() {
return ingestService;
}
} }

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.IngestModule;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -29,13 +29,14 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestBootstrapper; import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.CompoundProcessor; import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -63,8 +64,10 @@ public class IngestActionFilterTests extends ESTestCase {
@Before @Before
public void setup() { public void setup() {
executionService = mock(PipelineExecutionService.class); executionService = mock(PipelineExecutionService.class);
IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class); IngestService ingestService = mock(IngestService.class);
when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
NodeService bootstrapper = mock(NodeService.class);
when(bootstrapper.getIngestService()).thenReturn(ingestService);
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
} }
@ -170,8 +173,10 @@ public class IngestActionFilterTests extends ESTestCase {
}; };
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
executionService = new PipelineExecutionService(store, threadPool); executionService = new PipelineExecutionService(store, threadPool);
IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class); IngestService ingestService = mock(IngestService.class);
when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
NodeService bootstrapper = mock(NodeService.class);
when(bootstrapper.getIngestService()).thenReturn(ingestService);
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
BulkRequest bulkRequest = new BulkRequest(); BulkRequest bulkRequest = new BulkRequest();

View File

@ -33,7 +33,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.ingest.IngestModule; import org.elasticsearch.node.NodeModule;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
@ -245,7 +245,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
@Override @Override
protected boolean matchesSafely(DiscoveryNode node) { protected boolean matchesSafely(DiscoveryNode node) {
return IngestModule.isIngestEnabled(node.getAttributes()); return NodeModule.isNodeIngestEnabled(node.getAttributes());
} }
} }
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
@ -224,7 +225,7 @@ public class IngestClientIT extends ESIntegTestCase {
return "ingest mock"; return "ingest mock";
} }
public void onModule(IngestModule ingestModule) { public void onModule(NodeModule ingestModule) {
ingestModule.registerProcessor("test", (environment, templateService) -> config -> ingestModule.registerProcessor("test", (environment, templateService) -> config ->
new TestProcessor("test", ingestDocument -> { new TestProcessor("test", ingestDocument -> {
ingestDocument.setFieldValue("processed", true); ingestDocument.setFieldValue("processed", true);

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.ingest; package org.elasticsearch.node;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -25,42 +25,42 @@ import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class IngestModuleTests extends ESTestCase { public class NodeModuleTests extends ESTestCase {
public void testIsIngestEnabledSettings() { public void testIsNodeIngestEnabledSettings() {
assertThat(IngestModule.isIngestEnabled(Settings.EMPTY), equalTo(true)); assertThat(NodeModule.isNodeIngestEnabled(Settings.EMPTY), equalTo(true));
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", true).build()), equalTo(true)); assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", true).build()), equalTo(true));
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "true").build()), equalTo(true)); assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "true").build()), equalTo(true));
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", false).build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", false).build()), equalTo(false));
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "false").build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "false").build()), equalTo(false));
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "off").build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "off").build()), equalTo(false));
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "no").build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "no").build()), equalTo(false));
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "0").build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(Settings.builder().put("node.ingest", "0").build()), equalTo(false));
} }
public void testIsIngestEnabledAttributes() { public void testIsIngestEnabledAttributes() {
assertThat(IngestModule.isIngestEnabled(ImmutableOpenMap.<String, String>builder().build()), equalTo(true)); assertThat(NodeModule.isNodeIngestEnabled(ImmutableOpenMap.<String, String>builder().build()), equalTo(true));
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.<String, String>builder(); ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.<String, String>builder();
builder.put("ingest", "true"); builder.put("ingest", "true");
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(true)); assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(true));
builder = ImmutableOpenMap.<String, String>builder(); builder = ImmutableOpenMap.<String, String>builder();
builder.put("ingest", "false"); builder.put("ingest", "false");
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false));
builder = ImmutableOpenMap.<String, String>builder(); builder = ImmutableOpenMap.<String, String>builder();
builder.put("ingest", "off"); builder.put("ingest", "off");
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false));
builder = ImmutableOpenMap.<String, String>builder(); builder = ImmutableOpenMap.<String, String>builder();
builder.put("ingest", "no"); builder.put("ingest", "no");
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false));
builder = ImmutableOpenMap.<String, String>builder(); builder = ImmutableOpenMap.<String, String>builder();
builder.put("ingest", "0"); builder.put("ingest", "0");
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false)); assertThat(NodeModule.isNodeIngestEnabled(builder.build()), equalTo(false));
} }
public void testIsIngestEnabledMethodsReturnTheSameValue() { public void testIsIngestEnabledMethodsReturnTheSameValue() {
@ -75,6 +75,6 @@ public class IngestModuleTests extends ESTestCase {
builder.put("ingest", randomString); builder.put("ingest", randomString);
ImmutableOpenMap<String, String> attributes = builder.build(); ImmutableOpenMap<String, String> attributes = builder.build();
assertThat(IngestModule.isIngestEnabled(settings), equalTo(IngestModule.isIngestEnabled(attributes))); assertThat(NodeModule.isNodeIngestEnabled(settings), equalTo(NodeModule.isNodeIngestEnabled(attributes)));
} }
} }

View File

@ -19,7 +19,7 @@
package org.elasticsearch.ingest.grok; package org.elasticsearch.ingest.grok;
import org.elasticsearch.ingest.IngestModule; import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -55,8 +55,8 @@ public class IngestGrokPlugin extends Plugin {
return "Ingest processor that uses grok patterns to split text"; return "Ingest processor that uses grok patterns to split text";
} }
public void onModule(IngestModule ingestModule) { public void onModule(NodeModule nodeModule) {
ingestModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns)); nodeModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns));
} }
static Map<String, String> loadBuiltinPatterns() throws IOException { static Map<String, String> loadBuiltinPatterns() throws IOException {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.ingest.geoip; package org.elasticsearch.ingest.geoip;
import org.elasticsearch.ingest.IngestModule; import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
public class IngestGeoIpPlugin extends Plugin { public class IngestGeoIpPlugin extends Plugin {
@ -34,7 +34,7 @@ public class IngestGeoIpPlugin extends Plugin {
return "Plugin that allows to plug in ingest processors"; return "Plugin that allows to plug in ingest processors";
} }
public void onModule(IngestModule ingestModule) { public void onModule(NodeModule nodeModule) {
ingestModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
} }
} }