diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc index 42585fe0e90..4f84da7203f 100644 --- a/docs/plugins/ingest.asciidoc +++ b/docs/plugins/ingest.asciidoc @@ -527,8 +527,8 @@ PUT _ingest/pipeline/my-pipeline-id -------------------------------------------------- // AUTOSENSE -NOTE: Each ingest node updates its processors asynchronously in the background, so it may take a few seconds for all - nodes to have the latest version of the pipeline. +NOTE: The put pipeline api also instructs all ingest nodes to reload their in-memory representation of pipelines, so that + pipeline changes take immediately in effect. ==== Get pipeline API diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index a50bed33ea0..fa013024db3 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -35,7 +35,6 @@ import org.elasticsearch.ingest.processor.split.SplitProcessor; import org.elasticsearch.ingest.processor.trim.TrimProcessor; import org.elasticsearch.ingest.processor.uppercase.UppercaseProcessor; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; -import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService; import java.util.HashMap; import java.util.Map; @@ -47,9 +46,7 @@ public class IngestModule extends AbstractModule { @Override protected void configure() { binder().bind(IngestRestFilter.class).asEagerSingleton(); - binder().bind(PipelineExecutionService.class).asEagerSingleton(); - binder().bind(PipelineStore.class).asEagerSingleton(); - binder().bind(SimulateExecutionService.class).asEagerSingleton(); + binder().bind(PipelineStoreBootstrapper.class).asEagerSingleton(); addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index 1bd68efd87c..d9772eaefc7 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -17,7 +17,6 @@ * under the License. */ - package org.elasticsearch.plugin.ingest; import org.elasticsearch.action.ActionModule; @@ -54,6 +53,7 @@ public class IngestPlugin extends Plugin { public static final String PIPELINE_ID_PARAM = "pipeline_id"; public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed"; public static final String NAME = "ingest"; + public static final String NODE_INGEST_SETTING = "node.ingest"; private final Settings nodeSettings; private final boolean transportClient; @@ -87,7 +87,7 @@ public class IngestPlugin extends Plugin { if (transportClient) { return Collections.emptyList(); } else { - return Collections.singletonList(PipelineStore.class); + return Collections.singletonList(PipelineStoreBootstrapper.class); } } @@ -95,6 +95,8 @@ public class IngestPlugin extends Plugin { public Settings additionalSettings() { return settingsBuilder() .put(PipelineExecutionService.additionalSettings(nodeSettings)) + // TODO: in a followup issue this should be made configurable + .put(NODE_INGEST_SETTING, true) .build(); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java index 18a79104f5a..3e7f405ab51 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -21,7 +21,6 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.ingest.IngestDocument; @@ -37,7 +36,6 @@ public class PipelineExecutionService { private final PipelineStore store; private final ThreadPool threadPool; - @Inject public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { this.store = store; this.threadPool = threadPool; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index 9e36cf17df2..bb6007170cb 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -30,76 +30,59 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.SearchScrollIterator; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Provider; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.env.Environment; -import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest; -import org.elasticsearch.script.*; +import org.elasticsearch.plugin.ingest.transport.reload.ReloadPipelinesAction; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import java.io.Closeable; import java.io.IOException; import java.util.*; -public class PipelineStore extends AbstractLifecycleComponent { +public class PipelineStore extends AbstractComponent implements Closeable { public final static String INDEX = ".ingest"; public final static String TYPE = "pipeline"; - private final ThreadPool threadPool; - private final Environment environment; + private Client client; private final TimeValue scrollTimeout; - private final ClusterService clusterService; - private final Provider clientProvider; - private final TimeValue pipelineUpdateInterval; - private final Provider scriptServiceProvider; + private final ReloadPipelinesAction reloadPipelinesAction; private final Pipeline.Factory factory = new Pipeline.Factory(); - private volatile Map processorFactoryRegistry; - private final Map processorFactoryProviders; + private Map processorFactoryRegistry; - private volatile Client client; + private volatile boolean started = false; private volatile Map pipelines = new HashMap<>(); - @Inject - public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, - Environment environment, ClusterService clusterService, Provider scriptServiceProvider, - Map processorFactoryProviders) { + public PipelineStore(Settings settings, ClusterService clusterService, TransportService transportService) { super(settings); - this.threadPool = threadPool; - this.environment = environment; - this.clusterService = clusterService; - this.clientProvider = clientProvider; - this.scriptServiceProvider = scriptServiceProvider; this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); - this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1)); - this.processorFactoryProviders = processorFactoryProviders; - - clusterService.add(new PipelineStoreListener()); + this.reloadPipelinesAction = new ReloadPipelinesAction(settings, this, clusterService, transportService); } - @Override - protected void doStart() { - // TODO this will be better when #15203 gets in: + public void setClient(Client client) { + this.client = client; + } + + public void buildProcessorFactoryRegistry(Map processorFactoryProviders, Environment environment, ScriptService scriptService) { Map processorFactories = new HashMap<>(); - TemplateService templateService = new InternalTemplateService(scriptServiceProvider.get()); + TemplateService templateService = new InternalTemplateService(scriptService); for (Map.Entry entry : processorFactoryProviders.entrySet()) { Processor.Factory processorFactory = entry.getValue().get(environment, templateService); processorFactories.put(entry.getKey(), processorFactory); @@ -108,35 +91,31 @@ public class PipelineStore extends AbstractLifecycleComponent { } @Override - protected void doStop() { - } - - @Override - protected void doClose() { - // TODO: When org.elasticsearch.node.Node can close Closable instances we should remove this code + public void close() throws IOException { + stop("closing"); + // TODO: When org.elasticsearch.node.Node can close Closable instances we should try to remove this code, + // since any wired closable should be able to close itself List closeables = new ArrayList<>(); for (Processor.Factory factory : processorFactoryRegistry.values()) { if (factory instanceof Closeable) { closeables.add((Closeable) factory); } } - try { - IOUtils.close(closeables); - } catch (IOException e) { - throw new RuntimeException(e); - } + IOUtils.close(closeables); } /** * Deletes the pipeline specified by id in the request. */ public void delete(DeletePipelineRequest request, ActionListener listener) { + ensureReady(); + DeleteRequest deleteRequest = new DeleteRequest(request); deleteRequest.index(PipelineStore.INDEX); deleteRequest.type(PipelineStore.TYPE); deleteRequest.id(request.id()); deleteRequest.refresh(true); - client().delete(deleteRequest, listener); + client.delete(deleteRequest, handleWriteResponseAndReloadPipelines(listener)); } /** @@ -145,6 +124,8 @@ public class PipelineStore extends AbstractLifecycleComponent { * @throws IllegalArgumentException If the pipeline holds incorrect configuration */ public void put(PutPipelineRequest request, ActionListener listener) throws IllegalArgumentException { + ensureReady(); + try { // validates the pipeline and processor configuration: Map pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2(); @@ -159,13 +140,15 @@ public class PipelineStore extends AbstractLifecycleComponent { indexRequest.id(request.id()); indexRequest.source(request.source()); indexRequest.refresh(true); - client().index(indexRequest, listener); + client.index(indexRequest, handleWriteResponseAndReloadPipelines(listener)); } /** * Returns the pipeline by the specified id */ public Pipeline get(String id) { + ensureReady(); + PipelineDefinition ref = pipelines.get(id); if (ref != null) { return ref.getPipeline(); @@ -179,6 +162,8 @@ public class PipelineStore extends AbstractLifecycleComponent { } public List getReference(String... ids) { + ensureReady(); + List result = new ArrayList<>(ids.length); for (String id : ids) { if (Regex.isSimpleMatchPattern(id)) { @@ -197,11 +182,7 @@ public class PipelineStore extends AbstractLifecycleComponent { return result; } - Pipeline constructPipeline(String id, Map config) throws Exception { - return factory.create(id, config, processorFactoryRegistry); - } - - synchronized void updatePipelines() throws Exception { + public synchronized void updatePipelines() throws Exception { // note: this process isn't fast or smart, but the idea is that there will not be many pipelines, // so for that reason the goal is to keep the update logic simple. @@ -210,9 +191,15 @@ public class PipelineStore extends AbstractLifecycleComponent { for (SearchHit hit : readAllPipelines()) { String pipelineId = hit.getId(); BytesReference pipelineSource = hit.getSourceRef(); - PipelineDefinition previous = newPipelines.get(pipelineId); - if (previous != null) { - if (previous.getSource().equals(pipelineSource)) { + PipelineDefinition current = newPipelines.get(pipelineId); + if (current != null) { + // If we first read from a primary shard copy and then from a replica copy, + // and a write did not yet make it into the replica shard + // then the source is not equal but we don't update because the current pipeline is the latest: + if (current.getVersion() > hit.getVersion()) { + continue; + } + if (current.getSource().equals(pipelineSource)) { continue; } } @@ -224,7 +211,7 @@ public class PipelineStore extends AbstractLifecycleComponent { int removed = 0; for (String existingPipelineId : pipelines.keySet()) { - if (!existPipeline(existingPipelineId)) { + if (pipelineExists(existingPipelineId) == false) { newPipelines.remove(existingPipelineId); removed++; } @@ -238,17 +225,46 @@ public class PipelineStore extends AbstractLifecycleComponent { } } - void startUpdateWorker() { - threadPool.schedule(pipelineUpdateInterval, ThreadPool.Names.GENERIC, new Updater()); + private Pipeline constructPipeline(String id, Map config) throws Exception { + return factory.create(id, config, processorFactoryRegistry); } - boolean existPipeline(String pipelineId) { + boolean pipelineExists(String pipelineId) { GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId); - GetResponse response = client().get(request).actionGet(); - return response.isExists(); + try { + GetResponse response = client.get(request).actionGet(); + return response.isExists(); + } catch (IndexNotFoundException e) { + // the ingest index doesn't exist, so the pipeline doesn't either: + return false; + } } - Iterable readAllPipelines() { + synchronized void start() throws Exception { + if (started) { + logger.debug("Pipeline already started"); + } else { + updatePipelines(); + started = true; + logger.debug("Pipeline store started with [{}] pipelines", pipelines.size()); + } + } + + synchronized void stop(String reason) { + if (started) { + started = false; + pipelines = new HashMap<>(); + logger.debug("Pipeline store stopped, reason [{}]", reason); + } else { + logger.debug("Pipeline alreadt stopped"); + } + } + + public boolean isStarted() { + return started; + } + + private Iterable readAllPipelines() { // TODO: the search should be replaced with an ingest API when it is available SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.version(true); @@ -256,40 +272,32 @@ public class PipelineStore extends AbstractLifecycleComponent { SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX); searchRequest.source(sourceBuilder); searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - return SearchScrollIterator.createIterator(client(), scrollTimeout, searchRequest); + return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest); } - private Client client() { - if (client == null) { - client = clientProvider.get(); + private void ensureReady() { + if (started == false) { + throw new IllegalStateException("pipeline store isn't ready yet"); } - return client; } - class Updater implements Runnable { - - @Override - public void run() { - try { - updatePipelines(); - } catch (Exception e) { - logger.error("pipeline store update failure", e); - } finally { - startUpdateWorker(); + @SuppressWarnings("unchecked") + private ActionListener handleWriteResponseAndReloadPipelines(ActionListener listener) { + return new ActionListener() { + @Override + public void onResponse(T result) { + try { + reloadPipelinesAction.reloadPipelinesOnAllNodes(reloadResult -> listener.onResponse(result)); + } catch (Throwable e) { + listener.onFailure(e); + } } - } - } - - class PipelineStoreListener implements ClusterStateListener { - - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) { - startUpdateWorker(); - clusterService.remove(this); + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); } - } + }; } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreBootstrapper.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreBootstrapper.java new file mode 100644 index 00000000000..4432fffcf2d --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreBootstrapper.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Map; + +public class PipelineStoreBootstrapper extends AbstractLifecycleComponent implements ClusterStateListener { + + private final ThreadPool threadPool; + private final Environment environment; + private final PipelineStore pipelineStore; + private final PipelineExecutionService pipelineExecutionService; + private final Map processorFactoryProvider; + + @Inject + public PipelineStoreBootstrapper(Settings settings, ThreadPool threadPool, Environment environment, + ClusterService clusterService, TransportService transportService, + Map processorFactoryProvider) { + super(settings); + this.threadPool = threadPool; + this.environment = environment; + this.processorFactoryProvider = processorFactoryProvider; + this.pipelineStore = new PipelineStore(settings, clusterService, transportService); + this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); + + clusterService.add(this); + } + + // for testing: + PipelineStoreBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService, + PipelineStore pipelineStore, PipelineExecutionService pipelineExecutionService) { + super(settings); + this.threadPool = threadPool; + this.environment = null; + clusterService.add(this); + this.pipelineStore = pipelineStore; + this.pipelineExecutionService = pipelineExecutionService; + this.processorFactoryProvider = null; + } + + public PipelineStore getPipelineStore() { + return pipelineStore; + } + + public PipelineExecutionService getPipelineExecutionService() { + return pipelineExecutionService; + } + + @Inject + public void setClient(Client client) { + pipelineStore.setClient(client); + } + + @Inject + public void setScriptService(ScriptService scriptService) { + pipelineStore.buildProcessorFactoryRegistry(processorFactoryProvider, environment, scriptService); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + + if (pipelineStore.isStarted()) { + if (validClusterState(event.state()) == false) { + stopPipelineStore("cluster state invalid [" + event.state() + "]"); + } + } else { + if (validClusterState(event.state())) { + startPipelineStore(); + } + } + } + + boolean validClusterState(ClusterState state) { + if (state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES) || + state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL)) { + return false; + } + + if (state.getMetaData().hasConcreteIndex(PipelineStore.INDEX)) { + IndexRoutingTable routingTable = state.getRoutingTable().index(PipelineStore.INDEX); + return routingTable.allPrimaryShardsActive(); + } else { + // it will be ready when auto create index kicks in before the first pipeline doc gets added + return true; + } + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + try { + pipelineStore.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + void startPipelineStore() { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + pipelineStore.start(); + } catch (Exception e) { + logger.warn("pipeline store failed to start, retrying...", e); + startPipelineStore(); + } + }); + } + + void stopPipelineStore(String reason) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + pipelineStore.stop(reason); + } catch (Exception e) { + logger.error("pipeline store stop failure", e); + } + }); + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index e552c76d4bf..15cf21b17fb 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineExecutionService; +import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import java.util.*; @@ -41,9 +42,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio private final PipelineExecutionService executionService; @Inject - public IngestActionFilter(Settings settings, PipelineExecutionService executionService) { + public IngestActionFilter(Settings settings, PipelineStoreBootstrapper bootstrapper) { super(settings); - this.executionService = executionService; + this.executionService = bootstrapper.getPipelineExecutionService(); } @Override diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java index 3b5e72c01d4..ce8615a6bfb 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -35,9 +36,9 @@ public class DeletePipelineTransportAction extends HandledTransportAction { + + public static final String ACTION_NAME = "internal:admin/ingest/reload/pipelines"; + + private final ClusterService clusterService; + private final TransportService transportService; + private final PipelineStore pipelineStore; + + public ReloadPipelinesAction(Settings settings, PipelineStore pipelineStore, ClusterService clusterService, TransportService transportService) { + super(settings); + this.pipelineStore = pipelineStore; + this.clusterService = clusterService; + this.transportService = transportService; + transportService.registerRequestHandler(ACTION_NAME, ReloadPipelinesRequest::new, ThreadPool.Names.SAME, this); + } + + public void reloadPipelinesOnAllNodes(Consumer listener) { + List ingestNodes = new ArrayList<>(); + for (DiscoveryNode node : clusterService.state().getNodes()) { + String nodeEnabled = node.getAttributes().get("ingest"); + if ("true".equals(nodeEnabled)) { + ingestNodes.add(node); + } + } + + if (ingestNodes.isEmpty()) { + throw new IllegalStateException("There are no ingest nodes in this cluster"); + } + + AtomicBoolean failed = new AtomicBoolean(); + AtomicInteger expectedResponses = new AtomicInteger(ingestNodes.size()); + for (DiscoveryNode node : ingestNodes) { + ReloadPipelinesRequest nodeRequest = new ReloadPipelinesRequest(); + transportService.sendRequest(node, ACTION_NAME, nodeRequest, new TransportResponseHandler() { + @Override + public ReloadPipelinesResponse newInstance() { + return new ReloadPipelinesResponse(); + } + + @Override + public void handleResponse(ReloadPipelinesResponse response) { + decrementAndReturn(); + } + + @Override + public void handleException(TransportException exp) { + logger.warn("failed to update pipelines on remote node [{}]", exp, node); + failed.set(true); + decrementAndReturn(); + } + + void decrementAndReturn() { + if (expectedResponses.decrementAndGet() == 0) { + listener.accept(!failed.get()); + } + } + + @Override + public String executor() { + return ThreadPool.Names.MANAGEMENT; + } + }); + } + } + + @Override + public void messageReceived(ReloadPipelinesRequest request, TransportChannel channel) throws Exception { + try { + pipelineStore.updatePipelines(); + channel.sendResponse(new ReloadPipelinesResponse()); + } catch (Throwable e) { + logger.warn("failed to update pipelines", e); + channel.sendResponse(e); + } + } + + final static class ReloadPipelinesRequest extends TransportRequest { + + } + + final static class ReloadPipelinesResponse extends TransportResponse { + + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java index fcf6e00c657..430d6fd7234 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java @@ -20,7 +20,6 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; @@ -29,14 +28,13 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; -public class SimulateExecutionService { +class SimulateExecutionService { private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT; private final ThreadPool threadPool; - @Inject - public SimulateExecutionService(ThreadPool threadPool) { + SimulateExecutionService(ThreadPool threadPool) { this.threadPool = threadPool; } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java index b4036c5bac3..4d6fa5f375a 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -34,14 +35,15 @@ import java.io.IOException; import java.util.Map; public class SimulatePipelineTransportAction extends HandledTransportAction { + private final PipelineStore pipelineStore; private final SimulateExecutionService executionService; @Inject - public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore, SimulateExecutionService executionService) { + public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) { super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new); - this.pipelineStore = pipelineStore; - this.executionService = executionService; + this.pipelineStore = bootstrapper.getPipelineStore(); + this.executionService = new SimulateExecutionService(threadPool); } @Override diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 4c27d4d6dca..1ab97961903 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -81,17 +81,12 @@ public class IngestClientIT extends ESIntegTestCase { .endArray() .endObject().bytes()) .get(); - assertBusy(new Runnable() { - @Override - public void run() { - GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) - .setIds("_id") - .get(); - assertThat(response.isFound(), is(true)); - assertThat(response.pipelines().size(), equalTo(1)); - assertThat(response.pipelines().get(0).getId(), equalTo("_id")); - } - }); + GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) + .setIds("_id") + .get(); + assertThat(getResponse.isFound(), is(true)); + assertThat(getResponse.pipelines().size(), equalTo(1)); + assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); SimulatePipelineResponse response = new SimulatePipelineRequestBuilder(client(), SimulatePipelineAction.INSTANCE) .setId("_id") @@ -200,14 +195,12 @@ public class IngestClientIT extends ESIntegTestCase { .endArray() .endObject().bytes()) .get(); - assertBusy(() -> { - GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) - .setIds("_id") - .get(); - assertThat(response.isFound(), is(true)); - assertThat(response.pipelines().size(), equalTo(1)); - assertThat(response.pipelines().get(0).getId(), equalTo("_id")); - }); + GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) + .setIds("_id") + .get(); + assertThat(getResponse.isFound(), is(true)); + assertThat(getResponse.pipelines().size(), equalTo(1)); + assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); createIndex("test"); XContentBuilder updateMappingBuilder = jsonBuilder().startObject().startObject("properties") @@ -222,23 +215,19 @@ public class IngestClientIT extends ESIntegTestCase { .putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id") .get(); - assertBusy(() -> { - Map doc = client().prepareGet("test", "type", "1") - .get().getSourceAsMap(); - assertThat(doc.get("val"), equalTo(123.42)); - assertThat(doc.get("status"), equalTo(400)); - assertThat(doc.get("msg"), equalTo("foo")); - }); + Map doc = client().prepareGet("test", "type", "1") + .get().getSourceAsMap(); + assertThat(doc.get("val"), equalTo(123.42)); + assertThat(doc.get("status"), equalTo(400)); + assertThat(doc.get("msg"), equalTo("foo")); client().prepareBulk().add( client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 ") ).putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id").get(); - assertBusy(() -> { - Map doc = client().prepareGet("test", "type", "2").get().getSourceAsMap(); - assertThat(doc.get("val"), equalTo(123.42)); - assertThat(doc.get("status"), equalTo(400)); - assertThat(doc.get("msg"), equalTo("foo")); - }); + doc = client().prepareGet("test", "type", "2").get().getSourceAsMap(); + assertThat(doc.get("val"), equalTo(123.42)); + assertThat(doc.get("status"), equalTo(400)); + assertThat(doc.get("msg"), equalTo("foo")); DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE) .setId("_id") @@ -246,13 +235,11 @@ public class IngestClientIT extends ESIntegTestCase { assertThat(response.isFound(), is(true)); assertThat(response.getId(), equalTo("_id")); - assertBusy(() -> { - GetPipelineResponse response1 = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) - .setIds("_id") - .get(); - assertThat(response1.isFound(), is(false)); - assertThat(response1.pipelines().size(), equalTo(0)); - }); + getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE) + .setIds("_id") + .get(); + assertThat(getResponse.isFound(), is(false)); + assertThat(getResponse.pipelines().size(), equalTo(0)); } @Override diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineBootstrapperTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineBootstrapperTests.java new file mode 100644 index 00000000000..bcf5e94f5cc --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineBootstrapperTests.java @@ -0,0 +1,253 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.internal.InternalSearchHit; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.Is.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class PipelineBootstrapperTests extends ESTestCase { + + private PipelineStore store; + private PipelineStoreBootstrapper bootstrapper; + + @Before + public void init() { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor(any())).thenReturn(Runnable::run); + ClusterService clusterService = mock(ClusterService.class); + store = mock(PipelineStore.class); + when(store.isStarted()).thenReturn(false); + PipelineExecutionService pipelineExecutionService = mock(PipelineExecutionService.class); + bootstrapper = new PipelineStoreBootstrapper(Settings.EMPTY, threadPool, clusterService, store, pipelineExecutionService); + } + + public void testStartAndStopInBackground() throws Exception { + ThreadPool threadPool = new ThreadPool("test"); + Client client = mock(Client.class); + TransportService transportService = mock(TransportService.class); + + ClusterService clusterService = mock(ClusterService.class); + when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList())); + when(client.searchScroll(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList())); + Settings settings = Settings.EMPTY; + PipelineStore store = new PipelineStore(settings, clusterService, transportService); + PipelineStoreBootstrapper bootstrapper = new PipelineStoreBootstrapper( + settings, threadPool, clusterService, store, null + ); + bootstrapper.setClient(client); + + List hits = new ArrayList<>(); + hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap()) + .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) + ); + when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(hits)); + when(client.get(any())).thenReturn(PipelineStoreTests.expectedGetResponse(true)); + + try { + store.get("1"); + fail("IllegalStateException expected"); + } catch (IllegalStateException e) { + assertThat(e.getMessage(), equalTo("pipeline store isn't ready yet")); + } + + bootstrapper.startPipelineStore(); + assertBusy(() -> { + assertThat(store.isStarted(), is(true)); + assertThat(store.get("1"), notNullValue()); + assertThat(store.get("1").getId(), equalTo("1")); + assertThat(store.get("1").getDescription(), equalTo("_description1")); + }); + + bootstrapper.stopPipelineStore("testing stop"); + assertBusy(() -> assertThat(store.isStarted(), is(false))); + + // the map internal search hit holds gets emptied after use, which is ok, but in this test we need to reset the source: + hits.get(0).sourceRef(new BytesArray("{\"description\": \"_description1\"}")); + hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap()) + .sourceRef(new BytesArray("{\"description\": \"_description2\"}")) + ); + bootstrapper.startPipelineStore(); + assertBusy(() -> { + assertThat(store.isStarted(), is(true)); + assertThat(store.get("1"), notNullValue()); + assertThat(store.get("1").getId(), equalTo("1")); + assertThat(store.get("1").getDescription(), equalTo("_description1")); + assertThat(store.get("2"), notNullValue()); + assertThat(store.get("2").getId(), equalTo("2")); + assertThat(store.get("2").getDescription(), equalTo("_description2")); + }); + threadPool.shutdown(); + } + + public void testPipelineStoreBootstrappingGlobalStateNotRecoveredBlock() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + csBuilder.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)); + ClusterState cs = csBuilder.metaData(MetaData.builder()).build(); + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, never()).start(); + verify(store, never()).stop(anyString()); + } + + public void testPipelineStoreBootstrappingGlobalStateNoMasterBlock() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + csBuilder.blocks(ClusterBlocks.builder() + .addGlobalBlock(randomBoolean() ? DiscoverySettings.NO_MASTER_BLOCK_WRITES : DiscoverySettings.NO_MASTER_BLOCK_ALL)); + ClusterState cs = csBuilder.metaData(MetaData.builder()).build(); + + // We're not started and there is a no master block, doing nothing: + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, never()).start(); + verify(store, never()).stop(anyString()); + + // We're started and there is a no master block, so we stop the store: + when(store.isStarted()).thenReturn(true); + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, never()).start(); + verify(store, times(1)).stop(anyString()); + } + + public void testPipelineStoreBootstrappingNoIngestIndex() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + ClusterState cs = csBuilder.metaData(MetaData.builder()).build(); + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, times(1)).start(); + } + + public void testPipelineStoreBootstrappingIngestIndexShardsNotStarted() throws Exception { + // .ingest index, but not all primary shards started: + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0)) + .addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + ClusterState cs = csBuilder.build(); + + // We're not running and the cluster state isn't ready, so we don't start. + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, never()).start(); + verify(store, never()).stop(anyString()); + + // We're running and the cluster state indicates that all our shards are unassigned, so we stop. + when(store.isStarted()).thenReturn(true); + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, never()).start(); + verify(store, times(1)).stop(anyString()); + } + + public void testPipelineStoreBootstrappingIngestIndexShardsStarted() throws Exception { + // .ingest index, but not all primary shards started: + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0)) + .addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + ClusterState cs = csBuilder.build(); + + // We're not running and the cluster state is ready, so we start. + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, times(1)).start(); + verify(store, never()).stop(anyString()); + + // We're running and the cluster state is good, so we do nothing. + when(store.isStarted()).thenReturn(true); + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, times(1)).start(); + verify(store, never()).stop(anyString()); + } + + public void testPipelineStoreBootstrappingFailure() throws Exception { + // .ingest index, but not all primary shards started: + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0)) + .addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + ClusterState cs = csBuilder.build(); + + // fail the first call with an runtime exception and subsequent calls just return: + doThrow(new RuntimeException()).doNothing().when(store).start(); + bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs)); + verify(store, times(2)).start(); + verify(store, never()).stop(anyString()); + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java index 51c1e877de4..90972ac41c2 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java @@ -31,14 +31,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.env.Environment; import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.junit.After; +import org.elasticsearch.transport.TransportService; import org.junit.Before; import org.mockito.ArgumentMatcher; import org.mockito.Matchers; @@ -51,7 +48,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -59,29 +55,25 @@ import static org.mockito.Mockito.when; public class PipelineStoreTests extends ESTestCase { - private ThreadPool threadPool; private PipelineStore store; private Client client; @Before - public void init() { - threadPool = new ThreadPool("test"); - client = mock(Client.class); - + public void init() throws Exception { + Settings settings = Settings.EMPTY; ClusterService clusterService = mock(ClusterService.class); - ScriptService scriptService = mock(ScriptService.class); - when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); - Environment environment = mock(Environment.class); - store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, () -> scriptService, Collections.emptyMap()); - } + TransportService transportService = mock(TransportService.class); - @After - public void cleanup() { - threadPool.shutdown(); + client = mock(Client.class); + when(client.search(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); + when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); + store = new PipelineStore(settings, clusterService, transportService); + store.setClient(client); + store.start(); } public void testUpdatePipeline() throws Exception { - List hits = new ArrayList<>(); + List hits = new ArrayList<>(); hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap()) .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) ); @@ -112,38 +104,9 @@ public class PipelineStoreTests extends ESTestCase { assertThat(store.get("2"), nullValue()); } - public void testPipelineUpdater() throws Exception { - List hits = new ArrayList<>(); - hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) - ); - when(client.search(any())).thenReturn(expectedSearchReponse(hits)); - when(client.get(any())).thenReturn(expectedGetResponse(true)); - assertThat(store.get("1"), nullValue()); - - store.startUpdateWorker(); - assertBusy(() -> { - assertThat(store.get("1"), notNullValue()); - assertThat(store.get("1").getId(), equalTo("1")); - assertThat(store.get("1").getDescription(), equalTo("_description1")); - }); - - hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap()) - .sourceRef(new BytesArray("{\"description\": \"_description2\"}")) - ); - assertBusy(() -> { - assertThat(store.get("1"), notNullValue()); - assertThat(store.get("1").getId(), equalTo("1")); - assertThat(store.get("1").getDescription(), equalTo("_description1")); - assertThat(store.get("2"), notNullValue()); - assertThat(store.get("2").getId(), equalTo("2")); - assertThat(store.get("2").getDescription(), equalTo("_description2")); - }); - } - public void testGetReference() throws Exception { // fill the store up for the test: - List hits = new ArrayList<>(); + List hits = new ArrayList<>(); hits.add(new InternalSearchHit(0, "foo", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); hits.add(new InternalSearchHit(0, "bar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); hits.add(new InternalSearchHit(0, "foobar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); @@ -183,7 +146,7 @@ public class PipelineStoreTests extends ESTestCase { assertThat(result.get(1).getPipeline().getId(), equalTo("bar")); } - ActionFuture expectedSearchReponse(List hits) { + static ActionFuture expectedSearchReponse(List hits) { return new PlainActionFuture() { @Override @@ -194,7 +157,7 @@ public class PipelineStoreTests extends ESTestCase { }; } - ActionFuture expectedGetResponse(boolean exists) { + static ActionFuture expectedGetResponse(boolean exists) { return new PlainActionFuture() { @Override public GetResponse get() throws InterruptedException, ExecutionException { @@ -203,7 +166,7 @@ public class PipelineStoreTests extends ESTestCase { }; } - GetRequest eqGetRequest(String index, String type, String id) { + static GetRequest eqGetRequest(String index, String type, String id) { return Matchers.argThat(new GetRequestMatcher(index, type, id)); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index 569cf32ac72..d007a5ca58b 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineExecutionService; import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; @@ -60,7 +61,9 @@ public class IngestActionFilterTests extends ESTestCase { @Before public void setup() { executionService = mock(PipelineExecutionService.class); - filter = new IngestActionFilter(Settings.EMPTY, executionService); + PipelineStoreBootstrapper bootstrapper = mock(PipelineStoreBootstrapper.class); + when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); + filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); } public void testApplyNoIngestId() throws Exception { @@ -181,7 +184,9 @@ public class IngestActionFilterTests extends ESTestCase { }; when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); executionService = new PipelineExecutionService(store, threadPool); - filter = new IngestActionFilter(Settings.EMPTY, executionService); + PipelineStoreBootstrapper bootstrapper = mock(PipelineStoreBootstrapper.class); + when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); + filter = new IngestActionFilter(Settings.EMPTY, bootstrapper); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id"); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/reload/ReloadPipelinesActionTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/reload/ReloadPipelinesActionTests.java new file mode 100644 index 00000000000..32467cfe555 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/reload/ReloadPipelinesActionTests.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport.reload; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.hamcrest.CoreMatchers.is; +import static org.mockito.Mockito.when; + +public class ReloadPipelinesActionTests extends ESTestCase { + + private ClusterService clusterService; + private TransportService transportService; + private ReloadPipelinesAction reloadPipelinesAction; + + @Before + public void init() { + Settings settings = Settings.EMPTY; + PipelineStore pipelineStore = mock(PipelineStore.class); + clusterService = mock(ClusterService.class); + transportService = mock(TransportService.class); + reloadPipelinesAction = new ReloadPipelinesAction(settings, pipelineStore, clusterService, transportService); + } + + public void testSuccess() { + int numNodes = randomIntBetween(1, 10); + int numIngestNodes = 0; + + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + for (int i = 0; i < numNodes; i++) { + boolean ingestNode = i == 0 || randomBoolean(); + DiscoveryNode discoNode = generateDiscoNode(i, ingestNode); + discoNodes.put(discoNode); + if (ingestNode) { + numIngestNodes++; + } + } + ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); + when(clusterService.state()).thenReturn(state); + + final int finalNumIngestNodes = numIngestNodes; + doAnswer(mock -> { + TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3]; + for (int i = 0; i < finalNumIngestNodes; i++) { + handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse()); + } + return mock; + }).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any()); + reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(true))); + } + + public void testWithAtLeastOneFailure() { + int numNodes = randomIntBetween(1, 10); + int numIngestNodes = 0; + + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + for (int i = 0; i < numNodes; i++) { + boolean ingestNode = i == 0 || randomBoolean(); + DiscoveryNode discoNode = generateDiscoNode(i, ingestNode); + discoNodes.put(discoNode); + if (ingestNode) { + numIngestNodes++; + } + } + ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); + when(clusterService.state()).thenReturn(state); + + final int finalNumIngestNodes = numIngestNodes; + doAnswer(mock -> { + TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3]; + handler.handleException(new TransportException("test failure")); + for (int i = 1; i < finalNumIngestNodes; i++) { + if (randomBoolean()) { + handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse()); + } else { + handler.handleException(new TransportException("test failure")); + } + } + return mock; + }).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any()); + reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false))); + } + + public void testNoIngestNodes() { + // expected exception if there are no nodes: + DiscoveryNodes discoNodes = DiscoveryNodes.builder() + .build(); + ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); + when(clusterService.state()).thenReturn(state); + + try { + reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked")); + fail("exception expected"); + } catch (IllegalStateException e) { + assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster")); + } + + // expected exception if there are no ingest nodes: + discoNodes = DiscoveryNodes.builder() + .put(new DiscoveryNode("_name", "_id", new LocalTransportAddress("_id"), Collections.singletonMap("ingest", "false"), Version.CURRENT)) + .build(); + state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); + when(clusterService.state()).thenReturn(state); + + try { + reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked")); + fail("exception expected"); + } catch (IllegalStateException e) { + assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster")); + } + } + + private DiscoveryNode generateDiscoNode(int index, boolean ingestNode) { + Map attributes; + if (ingestNode) { + attributes = Collections.singletonMap("ingest", "true"); + } else { + attributes = Collections.emptyMap(); + } + String id = String.valueOf(index); + return new DiscoveryNode(id, id, new LocalTransportAddress(id), attributes, Version.CURRENT); + } + +} diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml index 9cf5d50a68f..7dd83313a7f 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/20_crud.yaml @@ -1,9 +1,5 @@ --- "Test basic pipeline crud": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -24,14 +20,6 @@ - match: { _version: 1 } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.get_pipeline: id: "my_pipeline" @@ -47,14 +35,6 @@ - match: { _id: "my_pipeline" } - match: { found: true } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: catch: missing ingest.get_pipeline: @@ -62,10 +42,6 @@ --- "Test invalid config": - - do: - cluster.health: - wait_for_status: green - - do: catch: param ingest.put_pipeline: diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/30_grok.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/30_grok.yaml index e0f97b625b1..7807631344a 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/30_grok.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/30_grok.yaml @@ -1,9 +1,5 @@ --- "Test Grok Pipeline": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -21,14 +17,6 @@ } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.index: index: test @@ -48,10 +36,6 @@ --- "Test Grok Pipeline With Custom Pattern": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -97,10 +81,6 @@ --- "Test Grok Pipeline With Custom Pattern Sharing Same Name As Another": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/40_geoip_processor.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/40_geoip_processor.yaml index 3c0efc19c78..d35898a59d5 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/40_geoip_processor.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/40_geoip_processor.yaml @@ -1,9 +1,5 @@ --- "Test geoip processor with defaults": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -20,14 +16,6 @@ } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.index: index: test @@ -72,14 +60,6 @@ } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.index: index: test @@ -108,10 +88,6 @@ --- "Test geoip processor with different database file": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -129,14 +105,6 @@ } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.index: index: test diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/50_date_processor.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/50_date_processor.yaml index f2c6d2c0cce..5caad399d23 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/50_date_processor.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/50_date_processor.yaml @@ -1,9 +1,5 @@ --- "Test date processor": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -23,14 +19,6 @@ } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.index: index: test diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml index ca0f58435df..75cef2971c0 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml @@ -1,9 +1,5 @@ --- "Test mutate processors": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -72,14 +68,6 @@ } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.index: index: test diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml index edbd9494088..641fecf16f1 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml @@ -1,9 +1,5 @@ --- "Test simulate with stored ingest pipeline": - - do: - cluster.health: - wait_for_status: green - - do: ingest.put_pipeline: id: "my_pipeline" @@ -21,14 +17,6 @@ } - match: { _id: "my_pipeline" } - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - do: ingest.simulate: id: "my_pipeline" @@ -53,10 +41,6 @@ --- "Test simulate with provided pipeline definition": - - do: - cluster.health: - wait_for_status: green - - do: ingest.simulate: body: > @@ -87,10 +71,6 @@ --- "Test simulate with no provided pipeline or pipeline_id": - - do: - cluster.health: - wait_for_status: green - - do: catch: request ingest.simulate: @@ -114,10 +94,6 @@ --- "Test simulate with verbose flag": - - do: - cluster.health: - wait_for_status: green - - do: ingest.simulate: verbose: true @@ -168,10 +144,6 @@ --- "Test simulate with exception thrown": - - do: - cluster.health: - wait_for_status: green - - do: ingest.simulate: body: > @@ -213,10 +185,6 @@ --- "Test verbose simulate with exception thrown": - - do: - cluster.health: - wait_for_status: green - - do: ingest.simulate: verbose: true