Added an internal reload pipeline api that makes sure pipeline changes are visible on all ingest nodes after modifcations have been made.

* Pipeline store can now only start when there is no .ingest index or all primary shards of .ingest have been started
* IngestPlugin adds`node.ingest` setting to `true`. This is used to figure out to what nodes to send the refresh request too. This setting isn't yet configurable. This will be done in a follow up issue.
* Removed the background pipeline updater and added added logic to deal with specific scenarious to reload all pipelines.
* Ingest services are no longer be managed by Guice. Only the bootstrapper gets managed by guice and that contructs
all the services/components ingest will need.
This commit is contained in:
Martijn van Groningen 2015-12-03 00:03:31 +01:00
parent e8a8e22e09
commit 6dfcee6937
24 changed files with 872 additions and 339 deletions

View File

@ -527,8 +527,8 @@ PUT _ingest/pipeline/my-pipeline-id
--------------------------------------------------
// AUTOSENSE
NOTE: Each ingest node updates its processors asynchronously in the background, so it may take a few seconds for all
nodes to have the latest version of the pipeline.
NOTE: The put pipeline api also instructs all ingest nodes to reload their in-memory representation of pipelines, so that
pipeline changes take immediately in effect.
==== Get pipeline API

View File

@ -35,7 +35,6 @@ import org.elasticsearch.ingest.processor.split.SplitProcessor;
import org.elasticsearch.ingest.processor.trim.TrimProcessor;
import org.elasticsearch.ingest.processor.uppercase.UppercaseProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService;
import java.util.HashMap;
import java.util.Map;
@ -47,9 +46,7 @@ public class IngestModule extends AbstractModule {
@Override
protected void configure() {
binder().bind(IngestRestFilter.class).asEagerSingleton();
binder().bind(PipelineExecutionService.class).asEagerSingleton();
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(SimulateExecutionService.class).asEagerSingleton();
binder().bind(PipelineStoreBootstrapper.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));

View File

@ -17,7 +17,6 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionModule;
@ -54,6 +53,7 @@ public class IngestPlugin extends Plugin {
public static final String PIPELINE_ID_PARAM = "pipeline_id";
public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
public static final String NAME = "ingest";
public static final String NODE_INGEST_SETTING = "node.ingest";
private final Settings nodeSettings;
private final boolean transportClient;
@ -87,7 +87,7 @@ public class IngestPlugin extends Plugin {
if (transportClient) {
return Collections.emptyList();
} else {
return Collections.singletonList(PipelineStore.class);
return Collections.singletonList(PipelineStoreBootstrapper.class);
}
}
@ -95,6 +95,8 @@ public class IngestPlugin extends Plugin {
public Settings additionalSettings() {
return settingsBuilder()
.put(PipelineExecutionService.additionalSettings(nodeSettings))
// TODO: in a followup issue this should be made configurable
.put(NODE_INGEST_SETTING, true)
.build();
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.ingest.IngestDocument;
@ -37,7 +36,6 @@ public class PipelineExecutionService {
private final PipelineStore store;
private final ThreadPool threadPool;
@Inject
public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) {
this.store = store;
this.threadPool = threadPool;

View File

@ -30,76 +30,59 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.SearchScrollIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.script.*;
import org.elasticsearch.plugin.ingest.transport.reload.ReloadPipelinesAction;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
public class PipelineStore extends AbstractLifecycleComponent {
public class PipelineStore extends AbstractComponent implements Closeable {
public final static String INDEX = ".ingest";
public final static String TYPE = "pipeline";
private final ThreadPool threadPool;
private final Environment environment;
private Client client;
private final TimeValue scrollTimeout;
private final ClusterService clusterService;
private final Provider<Client> clientProvider;
private final TimeValue pipelineUpdateInterval;
private final Provider<ScriptService> scriptServiceProvider;
private final ReloadPipelinesAction reloadPipelinesAction;
private final Pipeline.Factory factory = new Pipeline.Factory();
private volatile Map<String, Processor.Factory> processorFactoryRegistry;
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders;
private Map<String, Processor.Factory> processorFactoryRegistry;
private volatile Client client;
private volatile boolean started = false;
private volatile Map<String, PipelineDefinition> pipelines = new HashMap<>();
@Inject
public PipelineStore(Settings settings, Provider<Client> clientProvider, ThreadPool threadPool,
Environment environment, ClusterService clusterService, Provider<ScriptService> scriptServiceProvider,
Map<String, ProcessorFactoryProvider> processorFactoryProviders) {
public PipelineStore(Settings settings, ClusterService clusterService, TransportService transportService) {
super(settings);
this.threadPool = threadPool;
this.environment = environment;
this.clusterService = clusterService;
this.clientProvider = clientProvider;
this.scriptServiceProvider = scriptServiceProvider;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
this.processorFactoryProviders = processorFactoryProviders;
clusterService.add(new PipelineStoreListener());
this.reloadPipelinesAction = new ReloadPipelinesAction(settings, this, clusterService, transportService);
}
@Override
protected void doStart() {
// TODO this will be better when #15203 gets in:
public void setClient(Client client) {
this.client = client;
}
public void buildProcessorFactoryRegistry(Map<String, ProcessorFactoryProvider> processorFactoryProviders, Environment environment, ScriptService scriptService) {
Map<String, Processor.Factory> processorFactories = new HashMap<>();
TemplateService templateService = new InternalTemplateService(scriptServiceProvider.get());
TemplateService templateService = new InternalTemplateService(scriptService);
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
Processor.Factory processorFactory = entry.getValue().get(environment, templateService);
processorFactories.put(entry.getKey(), processorFactory);
@ -108,35 +91,31 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
// TODO: When org.elasticsearch.node.Node can close Closable instances we should remove this code
public void close() throws IOException {
stop("closing");
// TODO: When org.elasticsearch.node.Node can close Closable instances we should try to remove this code,
// since any wired closable should be able to close itself
List<Closeable> closeables = new ArrayList<>();
for (Processor.Factory factory : processorFactoryRegistry.values()) {
if (factory instanceof Closeable) {
closeables.add((Closeable) factory);
}
}
try {
IOUtils.close(closeables);
} catch (IOException e) {
throw new RuntimeException(e);
}
IOUtils.close(closeables);
}
/**
* Deletes the pipeline specified by id in the request.
*/
public void delete(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
ensureReady();
DeleteRequest deleteRequest = new DeleteRequest(request);
deleteRequest.index(PipelineStore.INDEX);
deleteRequest.type(PipelineStore.TYPE);
deleteRequest.id(request.id());
deleteRequest.refresh(true);
client().delete(deleteRequest, listener);
client.delete(deleteRequest, handleWriteResponseAndReloadPipelines(listener));
}
/**
@ -145,6 +124,8 @@ public class PipelineStore extends AbstractLifecycleComponent {
* @throws IllegalArgumentException If the pipeline holds incorrect configuration
*/
public void put(PutPipelineRequest request, ActionListener<IndexResponse> listener) throws IllegalArgumentException {
ensureReady();
try {
// validates the pipeline and processor configuration:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2();
@ -159,13 +140,15 @@ public class PipelineStore extends AbstractLifecycleComponent {
indexRequest.id(request.id());
indexRequest.source(request.source());
indexRequest.refresh(true);
client().index(indexRequest, listener);
client.index(indexRequest, handleWriteResponseAndReloadPipelines(listener));
}
/**
* Returns the pipeline by the specified id
*/
public Pipeline get(String id) {
ensureReady();
PipelineDefinition ref = pipelines.get(id);
if (ref != null) {
return ref.getPipeline();
@ -179,6 +162,8 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
public List<PipelineDefinition> getReference(String... ids) {
ensureReady();
List<PipelineDefinition> result = new ArrayList<>(ids.length);
for (String id : ids) {
if (Regex.isSimpleMatchPattern(id)) {
@ -197,11 +182,7 @@ public class PipelineStore extends AbstractLifecycleComponent {
return result;
}
Pipeline constructPipeline(String id, Map<String, Object> config) throws Exception {
return factory.create(id, config, processorFactoryRegistry);
}
synchronized void updatePipelines() throws Exception {
public synchronized void updatePipelines() throws Exception {
// note: this process isn't fast or smart, but the idea is that there will not be many pipelines,
// so for that reason the goal is to keep the update logic simple.
@ -210,9 +191,15 @@ public class PipelineStore extends AbstractLifecycleComponent {
for (SearchHit hit : readAllPipelines()) {
String pipelineId = hit.getId();
BytesReference pipelineSource = hit.getSourceRef();
PipelineDefinition previous = newPipelines.get(pipelineId);
if (previous != null) {
if (previous.getSource().equals(pipelineSource)) {
PipelineDefinition current = newPipelines.get(pipelineId);
if (current != null) {
// If we first read from a primary shard copy and then from a replica copy,
// and a write did not yet make it into the replica shard
// then the source is not equal but we don't update because the current pipeline is the latest:
if (current.getVersion() > hit.getVersion()) {
continue;
}
if (current.getSource().equals(pipelineSource)) {
continue;
}
}
@ -224,7 +211,7 @@ public class PipelineStore extends AbstractLifecycleComponent {
int removed = 0;
for (String existingPipelineId : pipelines.keySet()) {
if (!existPipeline(existingPipelineId)) {
if (pipelineExists(existingPipelineId) == false) {
newPipelines.remove(existingPipelineId);
removed++;
}
@ -238,17 +225,46 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
}
void startUpdateWorker() {
threadPool.schedule(pipelineUpdateInterval, ThreadPool.Names.GENERIC, new Updater());
private Pipeline constructPipeline(String id, Map<String, Object> config) throws Exception {
return factory.create(id, config, processorFactoryRegistry);
}
boolean existPipeline(String pipelineId) {
boolean pipelineExists(String pipelineId) {
GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId);
GetResponse response = client().get(request).actionGet();
return response.isExists();
try {
GetResponse response = client.get(request).actionGet();
return response.isExists();
} catch (IndexNotFoundException e) {
// the ingest index doesn't exist, so the pipeline doesn't either:
return false;
}
}
Iterable<SearchHit> readAllPipelines() {
synchronized void start() throws Exception {
if (started) {
logger.debug("Pipeline already started");
} else {
updatePipelines();
started = true;
logger.debug("Pipeline store started with [{}] pipelines", pipelines.size());
}
}
synchronized void stop(String reason) {
if (started) {
started = false;
pipelines = new HashMap<>();
logger.debug("Pipeline store stopped, reason [{}]", reason);
} else {
logger.debug("Pipeline alreadt stopped");
}
}
public boolean isStarted() {
return started;
}
private Iterable<SearchHit> readAllPipelines() {
// TODO: the search should be replaced with an ingest API when it is available
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.version(true);
@ -256,40 +272,32 @@ public class PipelineStore extends AbstractLifecycleComponent {
SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX);
searchRequest.source(sourceBuilder);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
return SearchScrollIterator.createIterator(client(), scrollTimeout, searchRequest);
return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest);
}
private Client client() {
if (client == null) {
client = clientProvider.get();
private void ensureReady() {
if (started == false) {
throw new IllegalStateException("pipeline store isn't ready yet");
}
return client;
}
class Updater implements Runnable {
@Override
public void run() {
try {
updatePipelines();
} catch (Exception e) {
logger.error("pipeline store update failure", e);
} finally {
startUpdateWorker();
@SuppressWarnings("unchecked")
private <T> ActionListener<T> handleWriteResponseAndReloadPipelines(ActionListener<T> listener) {
return new ActionListener<T>() {
@Override
public void onResponse(T result) {
try {
reloadPipelinesAction.reloadPipelinesOnAllNodes(reloadResult -> listener.onResponse(result));
} catch (Throwable e) {
listener.onFailure(e);
}
}
}
}
class PipelineStoreListener implements ClusterStateListener {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) {
startUpdateWorker();
clusterService.remove(this);
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
}
};
}
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
public class PipelineStoreBootstrapper extends AbstractLifecycleComponent implements ClusterStateListener {
private final ThreadPool threadPool;
private final Environment environment;
private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;
private final Map<String, ProcessorFactoryProvider> processorFactoryProvider;
@Inject
public PipelineStoreBootstrapper(Settings settings, ThreadPool threadPool, Environment environment,
ClusterService clusterService, TransportService transportService,
Map<String, ProcessorFactoryProvider> processorFactoryProvider) {
super(settings);
this.threadPool = threadPool;
this.environment = environment;
this.processorFactoryProvider = processorFactoryProvider;
this.pipelineStore = new PipelineStore(settings, clusterService, transportService);
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
clusterService.add(this);
}
// for testing:
PipelineStoreBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService,
PipelineStore pipelineStore, PipelineExecutionService pipelineExecutionService) {
super(settings);
this.threadPool = threadPool;
this.environment = null;
clusterService.add(this);
this.pipelineStore = pipelineStore;
this.pipelineExecutionService = pipelineExecutionService;
this.processorFactoryProvider = null;
}
public PipelineStore getPipelineStore() {
return pipelineStore;
}
public PipelineExecutionService getPipelineExecutionService() {
return pipelineExecutionService;
}
@Inject
public void setClient(Client client) {
pipelineStore.setClient(client);
}
@Inject
public void setScriptService(ScriptService scriptService) {
pipelineStore.buildProcessorFactoryRegistry(processorFactoryProvider, environment, scriptService);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return;
}
if (pipelineStore.isStarted()) {
if (validClusterState(event.state()) == false) {
stopPipelineStore("cluster state invalid [" + event.state() + "]");
}
} else {
if (validClusterState(event.state())) {
startPipelineStore();
}
}
}
boolean validClusterState(ClusterState state) {
if (state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES) ||
state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL)) {
return false;
}
if (state.getMetaData().hasConcreteIndex(PipelineStore.INDEX)) {
IndexRoutingTable routingTable = state.getRoutingTable().index(PipelineStore.INDEX);
return routingTable.allPrimaryShardsActive();
} else {
// it will be ready when auto create index kicks in before the first pipeline doc gets added
return true;
}
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
try {
pipelineStore.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
void startPipelineStore() {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
pipelineStore.start();
} catch (Exception e) {
logger.warn("pipeline store failed to start, retrying...", e);
startPipelineStore();
}
});
}
void stopPipelineStore(String reason) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
pipelineStore.stop(reason);
} catch (Exception e) {
logger.error("pipeline store stop failure", e);
}
});
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper;
import java.util.*;
@ -41,9 +42,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
private final PipelineExecutionService executionService;
@Inject
public IngestActionFilter(Settings settings, PipelineExecutionService executionService) {
public IngestActionFilter(Settings settings, PipelineStoreBootstrapper bootstrapper) {
super(settings);
this.executionService = executionService;
this.executionService = bootstrapper.getPipelineExecutionService();
}
@Override

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -35,9 +36,9 @@ public class DeletePipelineTransportAction extends HandledTransportAction<Delete
private final PipelineStore pipelineStore;
@Inject
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) {
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) {
super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
this.pipelineStore = pipelineStore;
this.pipelineStore = bootstrapper.getPipelineStore();
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -40,9 +41,9 @@ public class GetPipelineTransportAction extends HandledTransportAction<GetPipeli
private final PipelineStore pipelineStore;
@Inject
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) {
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) {
super(settings, GetPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
this.pipelineStore = pipelineStore;
this.pipelineStore = bootstrapper.getPipelineStore();
}
@Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -41,9 +42,9 @@ public class PutPipelineTransportAction extends HandledTransportAction<PutPipeli
private final PipelineStore pipelineStore;
@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) {
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) {
super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.pipelineStore = pipelineStore;
this.pipelineStore = bootstrapper.getPipelineStore();
}
@Override

View File

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.reload;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* An internal api that refreshes the in-memory representation of all the pipelines on all ingest nodes.
*/
public class ReloadPipelinesAction extends AbstractComponent implements TransportRequestHandler<ReloadPipelinesAction.ReloadPipelinesRequest> {
public static final String ACTION_NAME = "internal:admin/ingest/reload/pipelines";
private final ClusterService clusterService;
private final TransportService transportService;
private final PipelineStore pipelineStore;
public ReloadPipelinesAction(Settings settings, PipelineStore pipelineStore, ClusterService clusterService, TransportService transportService) {
super(settings);
this.pipelineStore = pipelineStore;
this.clusterService = clusterService;
this.transportService = transportService;
transportService.registerRequestHandler(ACTION_NAME, ReloadPipelinesRequest::new, ThreadPool.Names.SAME, this);
}
public void reloadPipelinesOnAllNodes(Consumer<Boolean> listener) {
List<DiscoveryNode> ingestNodes = new ArrayList<>();
for (DiscoveryNode node : clusterService.state().getNodes()) {
String nodeEnabled = node.getAttributes().get("ingest");
if ("true".equals(nodeEnabled)) {
ingestNodes.add(node);
}
}
if (ingestNodes.isEmpty()) {
throw new IllegalStateException("There are no ingest nodes in this cluster");
}
AtomicBoolean failed = new AtomicBoolean();
AtomicInteger expectedResponses = new AtomicInteger(ingestNodes.size());
for (DiscoveryNode node : ingestNodes) {
ReloadPipelinesRequest nodeRequest = new ReloadPipelinesRequest();
transportService.sendRequest(node, ACTION_NAME, nodeRequest, new TransportResponseHandler<ReloadPipelinesResponse>() {
@Override
public ReloadPipelinesResponse newInstance() {
return new ReloadPipelinesResponse();
}
@Override
public void handleResponse(ReloadPipelinesResponse response) {
decrementAndReturn();
}
@Override
public void handleException(TransportException exp) {
logger.warn("failed to update pipelines on remote node [{}]", exp, node);
failed.set(true);
decrementAndReturn();
}
void decrementAndReturn() {
if (expectedResponses.decrementAndGet() == 0) {
listener.accept(!failed.get());
}
}
@Override
public String executor() {
return ThreadPool.Names.MANAGEMENT;
}
});
}
}
@Override
public void messageReceived(ReloadPipelinesRequest request, TransportChannel channel) throws Exception {
try {
pipelineStore.updatePipelines();
channel.sendResponse(new ReloadPipelinesResponse());
} catch (Throwable e) {
logger.warn("failed to update pipelines", e);
channel.sendResponse(e);
}
}
final static class ReloadPipelinesRequest extends TransportRequest {
}
final static class ReloadPipelinesResponse extends TransportResponse {
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
@ -29,14 +28,13 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
public class SimulateExecutionService {
class SimulateExecutionService {
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
private final ThreadPool threadPool;
@Inject
public SimulateExecutionService(ThreadPool threadPool) {
SimulateExecutionService(ThreadPool threadPool) {
this.threadPool = threadPool;
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -34,14 +35,15 @@ import java.io.IOException;
import java.util.Map;
public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
private final PipelineStore pipelineStore;
private final SimulateExecutionService executionService;
@Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore, SimulateExecutionService executionService) {
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStoreBootstrapper bootstrapper) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
this.pipelineStore = pipelineStore;
this.executionService = executionService;
this.pipelineStore = bootstrapper.getPipelineStore();
this.executionService = new SimulateExecutionService(threadPool);
}
@Override

View File

@ -81,17 +81,12 @@ public class IngestClientIT extends ESIntegTestCase {
.endArray()
.endObject().bytes())
.get();
assertBusy(new Runnable() {
@Override
public void run() {
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(response.isFound(), is(true));
assertThat(response.pipelines().size(), equalTo(1));
assertThat(response.pipelines().get(0).getId(), equalTo("_id"));
}
});
GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(getResponse.isFound(), is(true));
assertThat(getResponse.pipelines().size(), equalTo(1));
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
SimulatePipelineResponse response = new SimulatePipelineRequestBuilder(client(), SimulatePipelineAction.INSTANCE)
.setId("_id")
@ -200,14 +195,12 @@ public class IngestClientIT extends ESIntegTestCase {
.endArray()
.endObject().bytes())
.get();
assertBusy(() -> {
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(response.isFound(), is(true));
assertThat(response.pipelines().size(), equalTo(1));
assertThat(response.pipelines().get(0).getId(), equalTo("_id"));
});
GetPipelineResponse getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(getResponse.isFound(), is(true));
assertThat(getResponse.pipelines().size(), equalTo(1));
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
createIndex("test");
XContentBuilder updateMappingBuilder = jsonBuilder().startObject().startObject("properties")
@ -222,23 +215,19 @@ public class IngestClientIT extends ESIntegTestCase {
.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id")
.get();
assertBusy(() -> {
Map<String, Object> doc = client().prepareGet("test", "type", "1")
.get().getSourceAsMap();
assertThat(doc.get("val"), equalTo(123.42));
assertThat(doc.get("status"), equalTo(400));
assertThat(doc.get("msg"), equalTo("foo"));
});
Map<String, Object> doc = client().prepareGet("test", "type", "1")
.get().getSourceAsMap();
assertThat(doc.get("val"), equalTo(123.42));
assertThat(doc.get("status"), equalTo(400));
assertThat(doc.get("msg"), equalTo("foo"));
client().prepareBulk().add(
client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 <foo>")
).putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id").get();
assertBusy(() -> {
Map<String, Object> doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
assertThat(doc.get("val"), equalTo(123.42));
assertThat(doc.get("status"), equalTo(400));
assertThat(doc.get("msg"), equalTo("foo"));
});
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
assertThat(doc.get("val"), equalTo(123.42));
assertThat(doc.get("status"), equalTo(400));
assertThat(doc.get("msg"), equalTo("foo"));
DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
.setId("_id")
@ -246,13 +235,11 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(response.isFound(), is(true));
assertThat(response.getId(), equalTo("_id"));
assertBusy(() -> {
GetPipelineResponse response1 = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(response1.isFound(), is(false));
assertThat(response1.pipelines().size(), equalTo(0));
});
getResponse = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(getResponse.isFound(), is(false));
assertThat(getResponse.pipelines().size(), equalTo(0));
}
@Override

View File

@ -0,0 +1,253 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
public class PipelineBootstrapperTests extends ESTestCase {
private PipelineStore store;
private PipelineStoreBootstrapper bootstrapper;
@Before
public void init() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(any())).thenReturn(Runnable::run);
ClusterService clusterService = mock(ClusterService.class);
store = mock(PipelineStore.class);
when(store.isStarted()).thenReturn(false);
PipelineExecutionService pipelineExecutionService = mock(PipelineExecutionService.class);
bootstrapper = new PipelineStoreBootstrapper(Settings.EMPTY, threadPool, clusterService, store, pipelineExecutionService);
}
public void testStartAndStopInBackground() throws Exception {
ThreadPool threadPool = new ThreadPool("test");
Client client = mock(Client.class);
TransportService transportService = mock(TransportService.class);
ClusterService clusterService = mock(ClusterService.class);
when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
when(client.searchScroll(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
Settings settings = Settings.EMPTY;
PipelineStore store = new PipelineStore(settings, clusterService, transportService);
PipelineStoreBootstrapper bootstrapper = new PipelineStoreBootstrapper(
settings, threadPool, clusterService, store, null
);
bootstrapper.setClient(client);
List<InternalSearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(hits));
when(client.get(any())).thenReturn(PipelineStoreTests.expectedGetResponse(true));
try {
store.get("1");
fail("IllegalStateException expected");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), equalTo("pipeline store isn't ready yet"));
}
bootstrapper.startPipelineStore();
assertBusy(() -> {
assertThat(store.isStarted(), is(true));
assertThat(store.get("1"), notNullValue());
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
});
bootstrapper.stopPipelineStore("testing stop");
assertBusy(() -> assertThat(store.isStarted(), is(false)));
// the map internal search hit holds gets emptied after use, which is ok, but in this test we need to reset the source:
hits.get(0).sourceRef(new BytesArray("{\"description\": \"_description1\"}"));
hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
);
bootstrapper.startPipelineStore();
assertBusy(() -> {
assertThat(store.isStarted(), is(true));
assertThat(store.get("1"), notNullValue());
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
assertThat(store.get("2"), notNullValue());
assertThat(store.get("2").getId(), equalTo("2"));
assertThat(store.get("2").getDescription(), equalTo("_description2"));
});
threadPool.shutdown();
}
public void testPipelineStoreBootstrappingGlobalStateNotRecoveredBlock() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
csBuilder.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK));
ClusterState cs = csBuilder.metaData(MetaData.builder()).build();
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, never()).stop(anyString());
}
public void testPipelineStoreBootstrappingGlobalStateNoMasterBlock() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
csBuilder.blocks(ClusterBlocks.builder()
.addGlobalBlock(randomBoolean() ? DiscoverySettings.NO_MASTER_BLOCK_WRITES : DiscoverySettings.NO_MASTER_BLOCK_ALL));
ClusterState cs = csBuilder.metaData(MetaData.builder()).build();
// We're not started and there is a no master block, doing nothing:
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, never()).stop(anyString());
// We're started and there is a no master block, so we stop the store:
when(store.isStarted()).thenReturn(true);
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, times(1)).stop(anyString());
}
public void testPipelineStoreBootstrappingNoIngestIndex() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
ClusterState cs = csBuilder.metaData(MetaData.builder()).build();
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, times(1)).start();
}
public void testPipelineStoreBootstrappingIngestIndexShardsNotStarted() throws Exception {
// .ingest index, but not all primary shards started:
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0))
.addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build();
// We're not running and the cluster state isn't ready, so we don't start.
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, never()).stop(anyString());
// We're running and the cluster state indicates that all our shards are unassigned, so we stop.
when(store.isStarted()).thenReturn(true);
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, times(1)).stop(anyString());
}
public void testPipelineStoreBootstrappingIngestIndexShardsStarted() throws Exception {
// .ingest index, but not all primary shards started:
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0))
.addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build();
// We're not running and the cluster state is ready, so we start.
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, times(1)).start();
verify(store, never()).stop(anyString());
// We're running and the cluster state is good, so we do nothing.
when(store.isStarted()).thenReturn(true);
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, times(1)).start();
verify(store, never()).stop(anyString());
}
public void testPipelineStoreBootstrappingFailure() throws Exception {
// .ingest index, but not all primary shards started:
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(PipelineStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(PipelineStore.INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(PipelineStore.INDEX, 0))
.addShard(TestShardRouting.newShardRouting(PipelineStore.INDEX, 0, "_node_id", null, null, true, ShardRoutingState.STARTED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build();
// fail the first call with an runtime exception and subsequent calls just return:
doThrow(new RuntimeException()).doNothing().when(store).start();
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, times(2)).start();
verify(store, never()).stop(anyString());
}
}

View File

@ -31,14 +31,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
@ -51,7 +48,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@ -59,29 +55,25 @@ import static org.mockito.Mockito.when;
public class PipelineStoreTests extends ESTestCase {
private ThreadPool threadPool;
private PipelineStore store;
private Client client;
@Before
public void init() {
threadPool = new ThreadPool("test");
client = mock(Client.class);
public void init() throws Exception {
Settings settings = Settings.EMPTY;
ClusterService clusterService = mock(ClusterService.class);
ScriptService scriptService = mock(ScriptService.class);
when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
Environment environment = mock(Environment.class);
store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, () -> scriptService, Collections.emptyMap());
}
TransportService transportService = mock(TransportService.class);
@After
public void cleanup() {
threadPool.shutdown();
client = mock(Client.class);
when(client.search(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
store = new PipelineStore(settings, clusterService, transportService);
store.setClient(client);
store.start();
}
public void testUpdatePipeline() throws Exception {
List<SearchHit> hits = new ArrayList<>();
List<InternalSearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
@ -112,38 +104,9 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(store.get("2"), nullValue());
}
public void testPipelineUpdater() throws Exception {
List<SearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
when(client.get(any())).thenReturn(expectedGetResponse(true));
assertThat(store.get("1"), nullValue());
store.startUpdateWorker();
assertBusy(() -> {
assertThat(store.get("1"), notNullValue());
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
});
hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
);
assertBusy(() -> {
assertThat(store.get("1"), notNullValue());
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
assertThat(store.get("2"), notNullValue());
assertThat(store.get("2").getId(), equalTo("2"));
assertThat(store.get("2").getDescription(), equalTo("_description2"));
});
}
public void testGetReference() throws Exception {
// fill the store up for the test:
List<SearchHit> hits = new ArrayList<>();
List<InternalSearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "foo", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "bar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "foobar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
@ -183,7 +146,7 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(result.get(1).getPipeline().getId(), equalTo("bar"));
}
ActionFuture<SearchResponse> expectedSearchReponse(List<SearchHit> hits) {
static ActionFuture<SearchResponse> expectedSearchReponse(List<InternalSearchHit> hits) {
return new PlainActionFuture<SearchResponse>() {
@Override
@ -194,7 +157,7 @@ public class PipelineStoreTests extends ESTestCase {
};
}
ActionFuture<GetResponse> expectedGetResponse(boolean exists) {
static ActionFuture<GetResponse> expectedGetResponse(boolean exists) {
return new PlainActionFuture<GetResponse>() {
@Override
public GetResponse get() throws InterruptedException, ExecutionException {
@ -203,7 +166,7 @@ public class PipelineStoreTests extends ESTestCase {
};
}
GetRequest eqGetRequest(String index, String type, String id) {
static GetRequest eqGetRequest(String index, String type, String id) {
return Matchers.argThat(new GetRequestMatcher(index, type, id));
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.PipelineStoreBootstrapper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
@ -60,7 +61,9 @@ public class IngestActionFilterTests extends ESTestCase {
@Before
public void setup() {
executionService = mock(PipelineExecutionService.class);
filter = new IngestActionFilter(Settings.EMPTY, executionService);
PipelineStoreBootstrapper bootstrapper = mock(PipelineStoreBootstrapper.class);
when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService);
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
}
public void testApplyNoIngestId() throws Exception {
@ -181,7 +184,9 @@ public class IngestActionFilterTests extends ESTestCase {
};
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
executionService = new PipelineExecutionService(store, threadPool);
filter = new IngestActionFilter(Settings.EMPTY, executionService);
PipelineStoreBootstrapper bootstrapper = mock(PipelineStoreBootstrapper.class);
when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService);
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");

View File

@ -0,0 +1,162 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.reload;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Mockito.when;
public class ReloadPipelinesActionTests extends ESTestCase {
private ClusterService clusterService;
private TransportService transportService;
private ReloadPipelinesAction reloadPipelinesAction;
@Before
public void init() {
Settings settings = Settings.EMPTY;
PipelineStore pipelineStore = mock(PipelineStore.class);
clusterService = mock(ClusterService.class);
transportService = mock(TransportService.class);
reloadPipelinesAction = new ReloadPipelinesAction(settings, pipelineStore, clusterService, transportService);
}
public void testSuccess() {
int numNodes = randomIntBetween(1, 10);
int numIngestNodes = 0;
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (int i = 0; i < numNodes; i++) {
boolean ingestNode = i == 0 || randomBoolean();
DiscoveryNode discoNode = generateDiscoNode(i, ingestNode);
discoNodes.put(discoNode);
if (ingestNode) {
numIngestNodes++;
}
}
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
when(clusterService.state()).thenReturn(state);
final int finalNumIngestNodes = numIngestNodes;
doAnswer(mock -> {
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
for (int i = 0; i < finalNumIngestNodes; i++) {
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
}
return mock;
}).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any());
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(true)));
}
public void testWithAtLeastOneFailure() {
int numNodes = randomIntBetween(1, 10);
int numIngestNodes = 0;
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (int i = 0; i < numNodes; i++) {
boolean ingestNode = i == 0 || randomBoolean();
DiscoveryNode discoNode = generateDiscoNode(i, ingestNode);
discoNodes.put(discoNode);
if (ingestNode) {
numIngestNodes++;
}
}
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
when(clusterService.state()).thenReturn(state);
final int finalNumIngestNodes = numIngestNodes;
doAnswer(mock -> {
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
handler.handleException(new TransportException("test failure"));
for (int i = 1; i < finalNumIngestNodes; i++) {
if (randomBoolean()) {
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
} else {
handler.handleException(new TransportException("test failure"));
}
}
return mock;
}).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any());
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false)));
}
public void testNoIngestNodes() {
// expected exception if there are no nodes:
DiscoveryNodes discoNodes = DiscoveryNodes.builder()
.build();
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
when(clusterService.state()).thenReturn(state);
try {
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked"));
fail("exception expected");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster"));
}
// expected exception if there are no ingest nodes:
discoNodes = DiscoveryNodes.builder()
.put(new DiscoveryNode("_name", "_id", new LocalTransportAddress("_id"), Collections.singletonMap("ingest", "false"), Version.CURRENT))
.build();
state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
when(clusterService.state()).thenReturn(state);
try {
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked"));
fail("exception expected");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster"));
}
}
private DiscoveryNode generateDiscoNode(int index, boolean ingestNode) {
Map<String, String> attributes;
if (ingestNode) {
attributes = Collections.singletonMap("ingest", "true");
} else {
attributes = Collections.emptyMap();
}
String id = String.valueOf(index);
return new DiscoveryNode(id, id, new LocalTransportAddress(id), attributes, Version.CURRENT);
}
}

View File

@ -1,9 +1,5 @@
---
"Test basic pipeline crud":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -24,14 +20,6 @@
- match: { _version: 1 }
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.get_pipeline:
id: "my_pipeline"
@ -47,14 +35,6 @@
- match: { _id: "my_pipeline" }
- match: { found: true }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
catch: missing
ingest.get_pipeline:
@ -62,10 +42,6 @@
---
"Test invalid config":
- do:
cluster.health:
wait_for_status: green
- do:
catch: param
ingest.put_pipeline:

View File

@ -1,9 +1,5 @@
---
"Test Grok Pipeline":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -21,14 +17,6 @@
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
@ -48,10 +36,6 @@
---
"Test Grok Pipeline With Custom Pattern":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -97,10 +81,6 @@
---
"Test Grok Pipeline With Custom Pattern Sharing Same Name As Another":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"

View File

@ -1,9 +1,5 @@
---
"Test geoip processor with defaults":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -20,14 +16,6 @@
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
@ -72,14 +60,6 @@
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
@ -108,10 +88,6 @@
---
"Test geoip processor with different database file":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -129,14 +105,6 @@
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test

View File

@ -1,9 +1,5 @@
---
"Test date processor":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -23,14 +19,6 @@
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test

View File

@ -1,9 +1,5 @@
---
"Test mutate processors":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -72,14 +68,6 @@
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test

View File

@ -1,9 +1,5 @@
---
"Test simulate with stored ingest pipeline":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
@ -21,14 +17,6 @@
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.simulate:
id: "my_pipeline"
@ -53,10 +41,6 @@
---
"Test simulate with provided pipeline definition":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
body: >
@ -87,10 +71,6 @@
---
"Test simulate with no provided pipeline or pipeline_id":
- do:
cluster.health:
wait_for_status: green
- do:
catch: request
ingest.simulate:
@ -114,10 +94,6 @@
---
"Test simulate with verbose flag":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
verbose: true
@ -168,10 +144,6 @@
---
"Test simulate with exception thrown":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
body: >
@ -213,10 +185,6 @@
---
"Test verbose simulate with exception thrown":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
verbose: true