Merge pull request #15769 from javanna/enhancement/move_to_core

Move ingest api to core
This commit is contained in:
Luca Cavanna 2016-01-07 16:04:16 +01:00
commit e149704ba2
120 changed files with 1172 additions and 1218 deletions

View File

@ -149,6 +149,16 @@ import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptAction;
import org.elasticsearch.action.indexedscripts.get.TransportGetIndexedScriptAction;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptAction;
import org.elasticsearch.action.indexedscripts.put.TransportPutIndexedScriptAction;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.action.ingest.IngestDisabledActionFilter;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineTransportAction;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineTransportAction;
import org.elasticsearch.action.percolate.MultiPercolateAction;
import org.elasticsearch.action.percolate.PercolateAction;
import org.elasticsearch.action.percolate.TransportMultiPercolateAction;
@ -186,6 +196,8 @@ import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestModule;
import java.util.ArrayList;
import java.util.HashMap;
@ -210,13 +222,13 @@ public class ActionModule extends AbstractModule {
this.transportAction = transportAction;
this.supportTransportActions = supportTransportActions;
}
}
private final boolean ingestEnabled;
private final boolean proxy;
public ActionModule(boolean proxy) {
public ActionModule(Settings settings, boolean proxy) {
this.ingestEnabled = IngestModule.isIngestEnabled(settings);
this.proxy = proxy;
}
@ -240,6 +252,13 @@ public class ActionModule extends AbstractModule {
@Override
protected void configure() {
if (proxy == false) {
if (ingestEnabled) {
registerFilter(IngestActionFilter.class);
} else {
registerFilter(IngestDisabledActionFilter.class);
}
}
Multibinder<ActionFilter> actionFilterMultibinder = Multibinder.newSetBinder(binder(), ActionFilter.class);
for (Class<? extends ActionFilter> actionFilter : actionFilters) {
@ -340,6 +359,11 @@ public class ActionModule extends AbstractModule {
registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);
registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
// register Name -> GenericAction Map that can be injected to instances.
MapBinder<String, GenericAction> actionsBinder
= MapBinder.newMapBinder(binder(), String.class, GenericAction.class);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.delete.DeleteResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteResponse;
@ -26,8 +26,8 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,14 +17,14 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.ingest.PipelineDefinition;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
@ -25,9 +25,9 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineDefinition;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -31,9 +31,8 @@ import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
import java.util.ArrayList;
@ -44,6 +43,10 @@ import java.util.Set;
public final class IngestActionFilter extends AbstractComponent implements ActionFilter {
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
public static final String PIPELINE_ID_PARAM = "pipeline";
static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
private final PipelineExecutionService executionService;
@Inject
@ -54,9 +57,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
@Override
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
String pipelineId = request.getFromContext(PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId == null) {
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
pipelineId = request.getHeader(PIPELINE_ID_PARAM);
if (pipelineId == null) {
chain.proceed(task, action, request, listener);
return;
@ -84,7 +87,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
// The IndexRequest has the same type on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence this check
if (indexRequest.hasHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED)) {
if (indexRequest.hasHeader(PIPELINE_ALREADY_PROCESSED)) {
chain.proceed(task, action, indexRequest, listener);
return;
}
@ -92,7 +95,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
logger.error("failed to execute pipeline [{}]", t, pipelineId);
listener.onFailure(t);
}, success -> {
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
indexRequest.putHeader(PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(task, action, indexRequest, listener);
});
}

View File

@ -16,25 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.tasks.Task;
public final class IngestDisabledActionFilter implements ActionFilter {
@Override
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
String pipelineId = request.getFromContext(IngestActionFilter.PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId != null) {
failRequest(pipelineId);
}
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
pipelineId = request.getHeader(IngestActionFilter.PIPELINE_ID_PARAM);
if (pipelineId != null) {
failRequest(pipelineId);
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.index.IndexResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
@ -26,8 +26,8 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -17,13 +17,14 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.reload;
package org.elasticsearch.action.ingest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
@ -33,8 +34,6 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@ -55,25 +54,14 @@ public class ReloadPipelinesAction extends AbstractComponent implements Transpor
this.pipelineStore = pipelineStore;
this.clusterService = clusterService;
this.transportService = transportService;
transportService.registerRequestHandler(ACTION_NAME, ReloadPipelinesRequest::new, ThreadPool.Names.SAME, this);
transportService.registerRequestHandler(ACTION_NAME, ReloadPipelinesRequest::new, ThreadPool.Names.MANAGEMENT, this);
}
public void reloadPipelinesOnAllNodes(Consumer<Boolean> listener) {
List<DiscoveryNode> ingestNodes = new ArrayList<>();
for (DiscoveryNode node : clusterService.state().getNodes()) {
String nodeEnabled = node.getAttributes().get("ingest");
if ("true".equals(nodeEnabled)) {
ingestNodes.add(node);
}
}
if (ingestNodes.isEmpty()) {
throw new IllegalStateException("There are no ingest nodes in this cluster");
}
AtomicBoolean failed = new AtomicBoolean();
AtomicInteger expectedResponses = new AtomicInteger(ingestNodes.size());
for (DiscoveryNode node : ingestNodes) {
DiscoveryNodes nodes = clusterService.state().getNodes();
AtomicInteger expectedResponses = new AtomicInteger(nodes.size());
for (DiscoveryNode node : nodes) {
ReloadPipelinesRequest nodeRequest = new ReloadPipelinesRequest();
transportService.sendRequest(node, ACTION_NAME, nodeRequest, new TransportResponseHandler<ReloadPipelinesResponse>() {
@Override
@ -101,7 +89,7 @@ public class ReloadPipelinesAction extends AbstractComponent implements Transpor
@Override
public String executor() {
return ThreadPool.Names.MANAGEMENT;
return ThreadPool.Names.SAME;
}
});
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;

View File

@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import java.io.IOException;
import java.util.Collections;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -17,12 +17,12 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,17 +17,17 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.PipelineStore;
import java.io.IOException;
import java.util.ArrayList;
@ -36,7 +36,7 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.ingest.IngestDocument.MetaData;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData;
public class SimulatePipelineRequest extends ActionRequest {

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
@ -26,8 +26,8 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
@ -25,7 +25,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import java.io.IOException;
import java.util.Collections;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -25,7 +25,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import java.io.IOException;
import java.util.Collections;

View File

@ -147,7 +147,7 @@ public class TransportClient extends AbstractClient {
// noop
}
});
modules.add(new ActionModule(true));
modules.add(new ActionModule(this.settings, true));
modules.add(new CircuitBreakerModule(this.settings));
pluginsService.processModules(modules);

View File

@ -113,6 +113,10 @@ import org.elasticsearch.rest.action.get.RestGetSourceAction;
import org.elasticsearch.rest.action.get.RestHeadAction;
import org.elasticsearch.rest.action.get.RestMultiGetAction;
import org.elasticsearch.rest.action.index.RestIndexAction;
import org.elasticsearch.rest.action.ingest.RestDeletePipelineAction;
import org.elasticsearch.rest.action.ingest.RestGetPipelineAction;
import org.elasticsearch.rest.action.ingest.RestPutPipelineAction;
import org.elasticsearch.rest.action.ingest.RestSimulatePipelineAction;
import org.elasticsearch.rest.action.main.RestMainAction;
import org.elasticsearch.rest.action.percolate.RestMultiPercolateAction;
import org.elasticsearch.rest.action.percolate.RestPercolateAction;
@ -256,7 +260,13 @@ public class NetworkModule extends AbstractModule {
RestCatAction.class,
// Tasks API
RestListTasksAction.class
RestListTasksAction.class,
// Ingest API
RestPutPipelineAction.class,
RestGetPipelineAction.class,
RestDeletePipelineAction.class,
RestSimulatePipelineAction.class
);
private static final List<Class<? extends AbstractCatAction>> builtinCatHandlers = Arrays.asList(

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
@ -42,7 +43,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
/**
* Instantiates and wires all the services that the ingest plugin will be needing.
@ -57,21 +57,24 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
private final Environment environment;
private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;
private final Map<String, ProcessorFactoryProvider> processorFactoryProvider;
private final ProcessorsRegistry processorsRegistry;
@Inject
public IngestBootstrapper(Settings settings, ThreadPool threadPool, Environment environment,
ClusterService clusterService, TransportService transportService,
Map<String, ProcessorFactoryProvider> processorFactoryProvider) {
ProcessorsRegistry processorsRegistry) {
super(settings);
this.threadPool = threadPool;
this.environment = environment;
this.processorFactoryProvider = processorFactoryProvider;
this.processorsRegistry = processorsRegistry;
this.pipelineStore = new PipelineStore(settings, clusterService, transportService);
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
boolean isNoTribeNode = settings.getByPrefix("tribe.").getAsMap().isEmpty();
if (isNoTribeNode) {
clusterService.add(this);
}
}
// for testing:
IngestBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService,
@ -82,7 +85,7 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
clusterService.add(this);
this.pipelineStore = pipelineStore;
this.pipelineExecutionService = pipelineExecutionService;
this.processorFactoryProvider = null;
this.processorsRegistry = null;
}
public PipelineStore getPipelineStore() {
@ -101,7 +104,7 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
@Inject
public void setScriptService(ScriptService scriptService) {
pipelineStore.buildProcessorFactoryRegistry(processorFactoryProvider, environment, scriptService);
pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService);
}
@Override
@ -153,6 +156,7 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
}
void forkAndInstallIngestIndexTemplate() {
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
installIngestIndexTemplate();
@ -160,11 +164,14 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
logger.debug("Failed to install .ingest index template", e);
}
});
} catch (EsRejectedExecutionException e) {
logger.debug("async fork and install template failed", e);
}
}
void installIngestIndexTemplate() throws IOException {
logger.debug("installing .ingest index template...");
try (InputStream is = IngestBootstrapper.class.getResourceAsStream("/ingest.json")) {
try (InputStream is = IngestBootstrapper.class.getResourceAsStream("ingest.json")) {
final byte[] template;
try (BytesStreamOutput out = new BytesStreamOutput()) {
Streams.copy(is, out);
@ -195,6 +202,7 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
}
void startPipelineStore(MetaData metaData) {
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
// Before we start the pipeline store we check if the index template exists,
@ -204,14 +212,18 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
installIngestIndexTemplate();
}
pipelineStore.start();
} catch (Exception e) {
logger.warn("pipeline store failed to start, retrying...", e);
} catch (Exception e1) {
logger.warn("pipeline store failed to start, retrying...", e1);
startPipelineStore(metaData);
}
});
} catch (EsRejectedExecutionException e) {
logger.debug("async pipeline store start failed", e);
}
}
void stopPipelineStore(String reason) {
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
pipelineStore.stop(reason);
@ -219,6 +231,9 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
logger.error("pipeline store stop failure", e);
}
});
} catch (EsRejectedExecutionException e) {
logger.debug("async pipeline store stop failed", e);
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.rest.action.ingest.IngestRestFilter;
import java.util.function.BiFunction;
/**
* Registry for processor factories
* @see Processor.Factory
*/
public class IngestModule extends AbstractModule {
private final ProcessorsRegistry processorsRegistry;
public IngestModule() {
this.processorsRegistry = new ProcessorsRegistry();
}
@Override
protected void configure() {
binder().bind(IngestRestFilter.class).asEagerSingleton();
bind(ProcessorsRegistry.class).toInstance(processorsRegistry);
binder().bind(IngestBootstrapper.class).asEagerSingleton();
}
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) {
processorsRegistry.registerProcessor(type, processorFactoryProvider);
}
public static boolean isIngestEnabled(Settings settings) {
return settings.getAsBoolean("node.ingest", false);
}
}

View File

@ -17,10 +17,10 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
@ -30,9 +30,7 @@ import org.elasticsearch.script.ScriptService;
import java.util.Collections;
import java.util.Map;
class InternalTemplateService implements TemplateService {
public static final ScriptContext.Plugin INGEST_SCRIPT_CONTEXT = new ScriptContext.Plugin("elasticsearch-ingest", "ingest");
public class InternalTemplateService implements TemplateService {
private final ScriptService scriptService;
@ -48,7 +46,7 @@ class InternalTemplateService implements TemplateService {
Script script = new Script(template, ScriptService.ScriptType.INLINE, "mustache", Collections.emptyMap());
CompiledScript compiledScript = scriptService.compile(
script,
INGEST_SCRIPT_CONTEXT,
ScriptContext.Standard.INGEST,
null /* we can supply null here, because ingest doesn't use indexed scripts */,
Collections.emptyMap()
);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -26,7 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.core.Pipeline;
import java.io.IOException;

View File

@ -17,14 +17,12 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
@ -32,8 +30,6 @@ import java.util.function.Consumer;
public class PipelineExecutionService {
static final String THREAD_POOL_NAME = IngestPlugin.NAME;
private final PipelineStore store;
private final ThreadPool threadPool;
@ -44,7 +40,7 @@ public class PipelineExecutionService {
public void execute(IndexRequest request, String pipelineId, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(pipelineId);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
try {
innerExecute(request, pipeline);
completionHandler.accept(true);
@ -57,7 +53,7 @@ public class PipelineExecutionService {
public void execute(Iterable<ActionRequest> actionRequests, String pipelineId,
Consumer<Throwable> itemFailureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(pipelineId);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
for (ActionRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest) == false) {
continue;
@ -108,20 +104,4 @@ public class PipelineExecutionService {
}
return pipeline;
}
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// the TP is already configured in the node settings
// no need for additional settings
return Settings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings);
return Settings.builder()
.put("threadpool." + THREAD_POOL_NAME + ".type", "fixed")
.put("threadpool." + THREAD_POOL_NAME + ".size", availableProcessors)
.put("threadpool." + THREAD_POOL_NAME + ".queue_size", 200)
.build();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
@ -40,12 +40,12 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.plugin.ingest.transport.reload.ReloadPipelinesAction;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.ReloadPipelinesAction;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -59,6 +59,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
public class PipelineStore extends AbstractComponent implements Closeable {
@ -84,11 +85,11 @@ public class PipelineStore extends AbstractComponent implements Closeable {
this.client = client;
}
public void buildProcessorFactoryRegistry(Map<String, ProcessorFactoryProvider> processorFactoryProviders, Environment environment, ScriptService scriptService) {
public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, Environment environment, ScriptService scriptService) {
Map<String, Processor.Factory> processorFactories = new HashMap<>();
TemplateService templateService = new InternalTemplateService(scriptService);
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
Processor.Factory processorFactory = entry.getValue().get(environment, templateService);
for (Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> entry : processorsRegistry.entrySet()) {
Processor.Factory processorFactory = entry.getValue().apply(environment, templateService);
processorFactories.put(entry.getKey(), processorFactory);
}
this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
public class ProcessorsRegistry {
private final Map<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> processorFactoryProviders = new HashMap<>();
/**
* Adds a processor factory under a specific name.
*/
public void registerProcessor(String name, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) {
BiFunction<Environment, TemplateService, Processor.Factory<?>> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider);
if (provider != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
}
}
public Set<Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>>> entrySet() {
return processorFactoryProviders.entrySet();
}
}

View File

@ -18,9 +18,7 @@
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
package org.elasticsearch.ingest.core;
import java.util.Arrays;
import java.util.Collections;
@ -42,6 +40,7 @@ public class CompoundProcessor implements Processor {
public CompoundProcessor(Processor... processor) {
this(Arrays.asList(processor), Collections.emptyList());
}
public CompoundProcessor(List<Processor> processors, List<Processor> onFailureProcessors) {
this.processors = processors;
this.onFailureProcessors = onFailureProcessors;
@ -57,7 +56,7 @@ public class CompoundProcessor implements Processor {
@Override
public String getType() {
return "compound[" + processors.stream().map(p -> p.getType()).collect(Collectors.joining(",")) + "]";
return "compound[" + processors.stream().map(Processor::getType).collect(Collectors.joining(",")) + "]";
}
@Override

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest.processor;
package org.elasticsearch.ingest.core;
import java.util.List;
import java.util.Map;

View File

@ -17,9 +17,17 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@ -38,7 +46,6 @@ import java.util.TimeZone;
public final class IngestDocument {
public final static String INGEST_KEY = "_ingest";
public final static String SOURCE_KEY = "_source";
static final String TIMESTAMP = "timestamp";
@ -348,7 +355,7 @@ public final class IngestDocument {
if (append) {
if (map.containsKey(leafKey)) {
Object object = map.get(leafKey);
List<Object> list = appendValues(path, object, value);
List<Object> list = appendValues(object, value);
if (list != object) {
map.put(leafKey, list);
}
@ -374,7 +381,7 @@ public final class IngestDocument {
}
if (append) {
Object object = list.get(index);
List<Object> newList = appendValues(path, object, value);
List<Object> newList = appendValues(object, value);
if (newList != object) {
list.set(index, newList);
}
@ -387,7 +394,7 @@ public final class IngestDocument {
}
@SuppressWarnings("unchecked")
private static List<Object> appendValues(String path, Object maybeList, Object value) {
private static List<Object> appendValues(Object maybeList, Object value) {
List<Object> list;
if (maybeList instanceof List) {
//maybeList is already a list, we append the provided values to it
@ -427,7 +434,7 @@ public final class IngestDocument {
private Map<String, Object> createTemplateModel() {
Map<String, Object> model = new HashMap<>(sourceAndMetadata);
model.put(SOURCE_KEY, sourceAndMetadata);
model.put(SourceFieldMapper.NAME, sourceAndMetadata);
// If there is a field in the source with the name '_ingest' it gets overwritten here,
// if access to that field is required then it get accessed via '_source._ingest'
model.put(INGEST_KEY, ingestMetadata);
@ -489,13 +496,13 @@ public final class IngestDocument {
}
public enum MetaData {
INDEX("_index"),
TYPE("_type"),
ID("_id"),
ROUTING("_routing"),
PARENT("_parent"),
TIMESTAMP("_timestamp"),
TTL("_ttl");
INDEX(IndexFieldMapper.NAME),
TYPE(TypeFieldMapper.NAME),
ID(IdFieldMapper.NAME),
ROUTING(RoutingFieldMapper.NAME),
PARENT(ParentFieldMapper.NAME),
TIMESTAMP(TimestampFieldMapper.NAME),
TTL(TTLFieldMapper.NAME);
private final String fieldName;
@ -506,7 +513,6 @@ public final class IngestDocument {
public String getFieldName() {
return fieldName;
}
}
private class FieldPath {
@ -523,7 +529,7 @@ public final class IngestDocument {
newPath = path.substring(8, path.length());
} else {
initialContext = sourceAndMetadata;
if (path.startsWith(SOURCE_KEY + ".")) {
if (path.startsWith(SourceFieldMapper.NAME + ".")) {
newPath = path.substring(8, path.length());
} else {
newPath = path;

View File

@ -18,11 +18,7 @@
*/
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.CompoundProcessor;
package org.elasticsearch.ingest.core;
import java.util.ArrayList;
import java.util.Arrays;
@ -92,12 +88,10 @@ public final class Pipeline {
}
if (onFailureProcessors.isEmpty()) {
return processor;
} else {
return new CompoundProcessor(Arrays.asList(processor), onFailureProcessors);
}
} else {
throw new IllegalArgumentException("No processor type exist with name [" + type + "]");
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
}
throw new IllegalArgumentException("No processor type exists with name [" + type + "]");
}
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception {
@ -121,6 +115,5 @@ public final class Pipeline {
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor);
}
}
}

View File

@ -18,9 +18,7 @@
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
package org.elasticsearch.ingest.core;
import java.util.Map;

View File

@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.Map;
/**
* Abstraction for the template engine.
* Abstraction for the ingest template engine: allows to compile a template into a {@link Template} object.
* A compiled template can be executed by calling its {@link Template#execute(Map)} method.
*/
// NOTE: this abstraction is added because the 'org.elasticsearch.ingest' has the requirement to be ES agnostic
public interface TemplateService {
Template compile(String template);
@ -33,7 +33,5 @@ public interface TemplateService {
String execute(Map<String, Object> model);
String getKey();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.ArrayList;
import java.util.HashMap;

View File

@ -70,9 +70,9 @@ import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.ingest.IngestModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
@ -189,7 +189,7 @@ public class Node implements Releasable {
modules.add(new ClusterModule(this.settings));
modules.add(new IndicesModule());
modules.add(new SearchModule());
modules.add(new ActionModule(false));
modules.add(new ActionModule(this.settings, false));
modules.add(new GatewayModule(settings));
modules.add(new NodeClientModule());
modules.add(new PercolatorModule());
@ -197,6 +197,7 @@ public class Node implements Releasable {
modules.add(new RepositoriesModule());
modules.add(new TribeModule());
modules.add(new AnalysisModule(environment));
modules.add(new IngestModule());
pluginsService.processModules(modules);

View File

@ -17,8 +17,9 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -26,9 +27,6 @@ import org.elasticsearch.rest.RestFilter;
import org.elasticsearch.rest.RestFilterChain;
import org.elasticsearch.rest.RestRequest;
import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM;
import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY;
public class IngestRestFilter extends RestFilter {
@Inject
@ -38,8 +36,8 @@ public class IngestRestFilter extends RestFilter {
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
if (request.hasParam(PIPELINE_ID_PARAM)) {
request.putInContext(PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(PIPELINE_ID_PARAM));
if (request.hasParam(IngestActionFilter.PIPELINE_ID_PARAM)) {
request.putInContext(IngestActionFilter.PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(IngestActionFilter.PIPELINE_ID_PARAM));
}
filterChain.continueProcessing(request, channel);
}

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -17,14 +17,14 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -37,7 +37,7 @@ public interface ScriptContext {
*/
enum Standard implements ScriptContext {
AGGS("aggs"), SEARCH("search"), UPDATE("update");
AGGS("aggs"), SEARCH("search"), UPDATE("update"), INGEST("ingest");
private final String key;

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.IngestModule;
import java.io.IOException;
import java.util.ArrayList;
@ -87,6 +88,7 @@ public class ThreadPool extends AbstractComponent {
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
public static final String INGEST = "ingest";
}
public enum ThreadPoolType {
@ -145,6 +147,7 @@ public class ThreadPool extends AbstractComponent {
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
map.put(Names.INGEST, ThreadPoolType.FIXED);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}
@ -234,6 +237,9 @@ public class ThreadPool extends AbstractComponent {
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m"));
if (IngestModule.isIngestEnabled(settings)) {
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INGEST).size(availableProcessors).queueSize(200));
}
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);

View File

@ -1,4 +1,4 @@
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
/*
* Licensed to Elasticsearch under one or more contributor

View File

@ -17,11 +17,10 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -30,34 +29,31 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.Matchers;
import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.BulkRequestModifier;
import static org.elasticsearch.action.ingest.IngestActionFilter.BulkRequestModifier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -92,13 +88,13 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(executionService).execute(Matchers.any(IndexRequest.class), Matchers.eq("_id"), Matchers.any(Consumer.class), Matchers.any(Consumer.class));
verifyZeroInteractions(actionFilterChain);
}
@ -106,13 +102,13 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
indexRequest.putInContext(IngestActionFilter.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(executionService).execute(Matchers.any(IndexRequest.class), Matchers.eq("_id"), Matchers.any(Consumer.class), Matchers.any(Consumer.class));
verifyZeroInteractions(actionFilterChain);
}
@ -120,8 +116,8 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ALREADY_PROCESSED, true);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -135,7 +131,7 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -157,23 +153,20 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
RuntimeException exception = new RuntimeException();
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Answer answer = invocationOnMock -> {
Consumer<Throwable> handler = (Consumer) invocationOnMock.getArguments()[2];
handler.accept(exception);
return null;
}
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(executionService).execute(Matchers.any(IndexRequest.class), Matchers.eq("_id"), Matchers.any(Consumer.class), Matchers.any(Consumer.class));
verify(actionListener).onFailure(exception);
verifyZeroInteractions(actionFilterChain);
}
@ -202,7 +195,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
bulkRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
if (rarely()) {

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.reload;
package org.elasticsearch.action.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
@ -27,20 +27,17 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.Matchers;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -62,52 +59,29 @@ public class ReloadPipelinesActionTests extends ESTestCase {
public void testSuccess() {
int numNodes = randomIntBetween(1, 10);
int numIngestNodes = 0;
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (int i = 0; i < numNodes; i++) {
boolean ingestNode = i == 0 || randomBoolean();
DiscoveryNode discoNode = generateDiscoNode(i, ingestNode);
discoNodes.put(discoNode);
if (ingestNode) {
numIngestNodes++;
}
}
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build();
when(clusterService.state()).thenReturn(state);
final int finalNumIngestNodes = numIngestNodes;
doAnswer(mock -> {
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
for (int i = 0; i < finalNumIngestNodes; i++) {
for (int i = 0; i < numNodes; i++) {
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
}
return mock;
}).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any());
}).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any());
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(true)));
}
public void testWithAtLeastOneFailure() {
int numNodes = randomIntBetween(1, 10);
int numIngestNodes = 0;
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (int i = 0; i < numNodes; i++) {
boolean ingestNode = i == 0 || randomBoolean();
DiscoveryNode discoNode = generateDiscoNode(i, ingestNode);
discoNodes.put(discoNode);
if (ingestNode) {
numIngestNodes++;
}
}
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build();
when(clusterService.state()).thenReturn(state);
final int finalNumIngestNodes = numIngestNodes;
doAnswer(mock -> {
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
handler.handleException(new TransportException("test failure"));
for (int i = 1; i < finalNumIngestNodes; i++) {
for (int i = 1; i < numNodes; i++) {
if (randomBoolean()) {
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
} else {
@ -115,48 +89,17 @@ public class ReloadPipelinesActionTests extends ESTestCase {
}
}
return mock;
}).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any());
}).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any());
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false)));
}
public void testNoIngestNodes() {
// expected exception if there are no nodes:
DiscoveryNodes discoNodes = DiscoveryNodes.builder()
.build();
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
when(clusterService.state()).thenReturn(state);
try {
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked"));
fail("exception expected");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster"));
private static DiscoveryNodes.Builder generateDiscoNodes(int numNodes) {
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (int i = 0; i < numNodes; i++) {
String id = Integer.toString(i);
DiscoveryNode discoNode = new DiscoveryNode(id, id, new LocalTransportAddress(id), Collections.emptyMap(), Version.CURRENT);
discoNodes.put(discoNode);
}
// expected exception if there are no ingest nodes:
discoNodes = DiscoveryNodes.builder()
.put(new DiscoveryNode("_name", "_id", new LocalTransportAddress("_id"), Collections.singletonMap("ingest", "false"), Version.CURRENT))
.build();
state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
when(clusterService.state()).thenReturn(state);
try {
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked"));
fail("exception expected");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster"));
return discoNodes;
}
}
private DiscoveryNode generateDiscoNode(int index, boolean ingestNode) {
Map<String, String> attributes;
if (ingestNode) {
attributes = Collections.singletonMap("ingest", "true");
} else {
attributes = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap("ingest", "false");
}
String id = String.valueOf(index);
return new DiscoveryNode(id, id, new LocalTransportAddress(id), attributes, Version.CURRENT);
}
}

View File

@ -17,11 +17,12 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -17,37 +17,33 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulateExecutionService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class SimulateExecutionServiceTests extends ESTestCase {
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private Pipeline pipeline;
private CompoundProcessor processor;
private IngestDocument ingestDocument;
@Before
@ -58,9 +54,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
.build()
);
executionService = new SimulateExecutionService(threadPool);
processor = mock(CompoundProcessor.class);
when(processor.getType()).thenReturn("mock");
pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
}
@ -70,8 +63,10 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
verify(processor, times(2)).execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
@ -91,8 +86,10 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
verify(processor, times(2)).execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
assertThat(simulateDocumentSimpleResult.getIngestDocument(), equalTo(ingestDocument));
@ -100,10 +97,12 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).doNothing().when(processor).execute(ingestDocument);
TestProcessor processor1 = new TestProcessor("mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
verify(processor, times(2)).execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
@ -116,15 +115,13 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocument));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
}
public void testExecuteItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).when(processor).execute(ingestDocument);
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
verify(processor, times(1)).execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
assertThat(simulateDocumentSimpleResult.getIngestDocument(), nullValue());

View File

@ -17,29 +17,30 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.ingest.IngestDocument.MetaData.ID;
import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX;
import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE;
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.ID;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.INDEX;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.TYPE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
@ -51,8 +52,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
@Before
public void init() throws IOException {
CompoundProcessor pipelineCompoundProcessor = mock(CompoundProcessor.class);
when(pipelineCompoundProcessor.getProcessors()).thenReturn(Arrays.asList(mock(Processor.class)));
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mock_processor", mock(Processor.Factory.class));

View File

@ -17,11 +17,16 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -17,11 +17,12 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -17,12 +17,12 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;

View File

@ -63,6 +63,8 @@ import static org.hamcrest.Matchers.nullValue;
*
*/
public class SimpleIndexTemplateIT extends ESIntegTestCase {
@AwaitsFix(bugUrl = "temporarily ignored till we have removed the ingest index template")
public void testSimpleIndexTemplateTests() throws Exception {
// clean all templates setup by the framework.
client().admin().indices().prepareDeleteTemplate("*").get();
@ -313,6 +315,7 @@ public class SimpleIndexTemplateIT extends ESIntegTestCase {
}
}
@AwaitsFix(bugUrl = "temporarily ignored till we have removed the ingest index template")
public void testInvalidSettings() throws Exception {
// clean all templates setup by the framework.
client().admin().indices().prepareDeleteTemplate("*").get();

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
@ -55,7 +55,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;

View File

@ -19,41 +19,37 @@
package org.elasticsearch.ingest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateDocumentSimpleResult;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
@ -73,7 +69,7 @@ public class IngestClientIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(IngestPlugin.NODE_INGEST_SETTING, true)
.put("node.ingest", true)
.build();
}
@ -81,7 +77,8 @@ public class IngestClientIT extends ESIntegTestCase {
protected Settings externalClusterClientSettings() {
return Settings.builder()
.put(super.transportClientSettings())
.put(IngestPlugin.NODE_INGEST_SETTING, true)
//TODO can we remove this?
.put("node.ingest", true)
.build();
}
@ -92,9 +89,7 @@ public class IngestClientIT extends ESIntegTestCase {
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("grok")
.field("field", "field1")
.field("pattern", "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>")
.startObject("test")
.endObject()
.endObject()
.endArray()
@ -117,6 +112,7 @@ public class IngestClientIT extends ESIntegTestCase {
.field("_id", "id")
.startObject("_source")
.field("foo", "bar")
.field("fail", false)
.endObject()
.endObject()
.endArray()
@ -128,35 +124,10 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(response.getResults().size(), equalTo(1));
assertThat(response.getResults().get(0), instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
assertThat(simulateDocumentSimpleResult.getIngestDocument(), nullValue());
assertThat(simulateDocumentSimpleResult.getFailure(), notNullValue());
response = new SimulatePipelineRequestBuilder(client(), SimulatePipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
.startArray("docs")
.startObject()
.field("_index", "index")
.field("_type", "type")
.field("_id", "id")
.startObject("_source")
.field("field1", "123.42 400 <foo>")
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
assertThat(response.isVerbose(), equalTo(false));
assertThat(response.getPipelineId(), equalTo("_id"));
assertThat(response.getResults().size(), equalTo(1));
assertThat(response.getResults().get(0), instanceOf(SimulateDocumentSimpleResult.class));
simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
Map<String, Object> source = new HashMap<>();
source.put("field1", "123.42 400 <foo>");
source.put("val", 123.42f);
source.put("status", 400);
source.put("msg", "foo");
source.put("foo", "bar");
source.put("fail", false);
source.put("processed", true);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source);
assertThat(simulateDocumentSimpleResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
@ -171,9 +142,7 @@ public class IngestClientIT extends ESIntegTestCase {
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("join")
.field("field", "field1")
.field("separator", "|")
.startObject("test")
.endObject()
.endObject()
.endArray()
@ -182,14 +151,10 @@ public class IngestClientIT extends ESIntegTestCase {
int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
bulkRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
if (i % 2 == 0) {
indexRequest.source("field1", Arrays.asList("value1", "value2"));
} else {
indexRequest.source("field2", Arrays.asList("value1", "value2"));
}
indexRequest.source("field", "value", "fail", i % 2 == 0);
bulkRequest.add(indexRequest);
}
@ -198,12 +163,12 @@ public class IngestClientIT extends ESIntegTestCase {
for (int i = 0; i < bulkRequest.requests().size(); i++) {
BulkItemResponse itemResponse = response.getItems()[i];
if (i % 2 == 0) {
BulkItemResponse.Failure failure = itemResponse.getFailure();
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: test processor failed"));
} else {
IndexResponse indexResponse = itemResponse.getResponse();
assertThat(indexResponse.getId(), equalTo(Integer.toString(i)));
assertThat(indexResponse.isCreated(), is(true));
} else {
BulkItemResponse.Failure failure = itemResponse.getFailure();
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: field [field1] not present as part of path [field1]"));
}
}
}
@ -215,9 +180,7 @@ public class IngestClientIT extends ESIntegTestCase {
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("grok")
.field("field", "field1")
.field("pattern", "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>")
.startObject("test")
.endObject()
.endObject()
.endArray()
@ -230,32 +193,21 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(getResponse.pipelines().size(), equalTo(1));
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
createIndex("test");
XContentBuilder updateMappingBuilder = jsonBuilder().startObject().startObject("properties")
.startObject("status").field("type", "integer").endObject()
.startObject("val").field("type", "float").endObject()
.endObject();
PutMappingResponse putMappingResponse = client().admin().indices()
.preparePutMapping("test").setType("type").setSource(updateMappingBuilder).get();
assertAcked(putMappingResponse);
client().prepareIndex("test", "type", "1").setSource("field1", "123.42 400 <foo>")
.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id")
client().prepareIndex("test", "type", "1").setSource("field", "value", "fail", false)
.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id")
.get();
Map<String, Object> doc = client().prepareGet("test", "type", "1")
.get().getSourceAsMap();
assertThat(doc.get("val"), equalTo(123.42));
assertThat(doc.get("status"), equalTo(400));
assertThat(doc.get("msg"), equalTo("foo"));
assertThat(doc.get("field"), equalTo("value"));
assertThat(doc.get("processed"), equalTo(true));
client().prepareBulk().add(
client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 <foo>")
).putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id").get();
client().prepareIndex("test", "type", "2").setSource("field", "value2", "fail", false)
).putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id").get();
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
assertThat(doc.get("val"), equalTo(123.42));
assertThat(doc.get("status"), equalTo(400));
assertThat(doc.get("msg"), equalTo("foo"));
assertThat(doc.get("field"), equalTo("value2"));
assertThat(doc.get("processed"), equalTo(true));
DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
.setId("_id")
@ -274,4 +226,28 @@ public class IngestClientIT extends ESIntegTestCase {
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Collections.emptyList();
}
public static class IngestPlugin extends Plugin {
@Override
public String name() {
return "ingest";
}
@Override
public String description() {
return "ingest mock";
}
public void onModule(IngestModule ingestModule) {
ingestModule.registerProcessor("test", (environment, templateService) -> config ->
new TestProcessor("test", ingestDocument -> {
ingestDocument.setFieldValue("processed", true);
if (ingestDocument.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");
}
})
);
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;
import org.junit.Before;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class IngestTemplateTests extends ESSingleNodeTestCase {
@Override
protected boolean resetNodeAfterTest() {
return true;
}
public void testIngestIndexTemplateIsInstalled() throws Exception {
assertBusy(IngestTemplateTests::verifyIngestIndexTemplateExist);
}
public void testInstallTemplateAfterItHasBeenRemoved() throws Exception {
assertBusy(IngestTemplateTests::verifyIngestIndexTemplateExist);
client().admin().indices().prepareDeleteTemplate(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME).get();
assertBusy(IngestTemplateTests::verifyIngestIndexTemplateExist);
}
private static void verifyIngestIndexTemplateExist() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().size(), Matchers.equalTo(1));
assertThat(response.getIndexTemplates().get(0).getName(), Matchers.equalTo(IngestBootstrapper.INGEST_INDEX_TEMPLATE_NAME));
assertThat(response.getIndexTemplates().get(0).getOrder(), Matchers.equalTo(Integer.MAX_VALUE));
assertThat(response.getIndexTemplates().get(0).getMappings().size(), Matchers.equalTo(1));
assertThat(response.getIndexTemplates().get(0).getMappings().get(PipelineStore.TYPE), Matchers.notNullValue());
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
@ -26,22 +26,17 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
@ -49,7 +44,8 @@ import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -74,7 +70,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecutePipelineDoesNotExist() {
when(store.get("_id")).thenReturn(null);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
try {
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
@ -91,11 +89,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
//TODO we remove metadata, this check is not valid anymore, what do we replace it with?
//verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
}
@ -117,7 +115,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(any());
@ -138,7 +138,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -153,11 +155,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
//TODO we remove metadata, this check is not valid anymore, what do we replace it with?
//verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, never()).accept(any(RuntimeException.class));
verify(completionHandler, times(1)).accept(true);
}
@ -170,7 +172,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -183,13 +187,15 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor onFailureProcessor = mock(Processor.class);
Processor onFailureOnFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(Arrays.asList(onFailureProcessor),Arrays.asList(onFailureOnFailureProcessor))));
Collections.singletonList(new CompoundProcessor(Collections.singletonList(onFailureProcessor), Collections.singletonList(onFailureOnFailureProcessor))));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -197,48 +203,44 @@ public class PipelineExecutionServiceTests extends ESTestCase {
verify(completionHandler, never()).accept(anyBoolean());
}
@SuppressWarnings("unchecked")
public void testExecuteTTL() throws Exception {
// test with valid ttl
SetProcessor.Factory metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance());
Map<String, Object> config = new HashMap<>();
config.put("field", "_ttl");
config.put("value", "5d");
Processor processor = metaProcessorFactory.create(config);
public void testExecuteSetTTL() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "5d"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl")));
verify(failureHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
}
// test with invalid ttl
metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance());
config = new HashMap<>();
config.put("field", "_ttl");
config.put("value", "abc");
processor = metaProcessorFactory.create(config);
public void testExecuteSetInvalidTTL() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "abc"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
failureHandler = mock(Consumer.class);
completionHandler = mock(Consumer.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(failureHandler, times(1)).accept(any(ElasticsearchParseException.class));
verify(completionHandler, never()).accept(anyBoolean());
}
// test with provided ttl
public void testExecuteProvidedTTL() throws Exception {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", mock(CompoundProcessor.class)));
indexRequest = new IndexRequest("_index", "_type", "_id")
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.ttl(1000L);
failureHandler = mock(Consumer.class);
completionHandler = mock(Consumer.class);
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
assertThat(indexRequest.ttl(), equalTo(new TimeValue(1000L)));
@ -296,7 +298,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
String pipelineId = "_id";
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor()));
@SuppressWarnings("unchecked")
Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(bulkRequest.requests(), pipelineId, requestItemErrorHandler, completionHandler);
@ -305,7 +309,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}
private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {
return Matchers.argThat(new IngestDocumentMatcher(index, type, id, source));
return argThat(new IngestDocumentMatcher(index, type, id, source));
}
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {

View File

@ -19,7 +19,8 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
@ -27,8 +28,6 @@ import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PipelineFactoryTests extends ESTestCase {
@ -38,13 +37,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorRegistry.put("test", processorFactory);
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -56,16 +49,10 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig)));
pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig)));
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorRegistry.put("test-processor", processorFactory);
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
@ -82,12 +69,7 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorRegistry.put("test", processorFactory);
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (IllegalArgumentException e) {
@ -103,14 +85,8 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorFactoryStore = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorFactoryStore.put("test", processorFactory);
Pipeline pipeline = factory.create("_id", pipelineConfig, processorFactoryStore);
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.get.GetRequest;
@ -37,7 +37,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import java.util.ArrayList;
import java.util.Collections;
@ -49,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -166,7 +166,7 @@ public class PipelineStoreTests extends ESTestCase {
}
static GetRequest eqGetRequest(String index, String type, String id) {
return Matchers.argThat(new GetRequestMatcher(index, type, id));
return argThat(new GetRequestMatcher(index, type, id));
}
static class GetRequestMatcher extends ArgumentMatcher<GetRequest> {

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.test.ESTestCase;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import static org.hamcrest.CoreMatchers.equalTo;
public class ProcessorsRegistryTests extends ESTestCase {
public void testAddProcessor() {
ProcessorsRegistry processorsRegistry = new ProcessorsRegistry();
TestProcessor.Factory factory1 = new TestProcessor.Factory();
processorsRegistry.registerProcessor("1", (environment, templateService) -> factory1);
TestProcessor.Factory factory2 = new TestProcessor.Factory();
processorsRegistry.registerProcessor("2", (environment, templateService) -> factory2);
TestProcessor.Factory factory3 = new TestProcessor.Factory();
try {
processorsRegistry.registerProcessor("1", (environment, templateService) -> factory3);
fail("addProcessor should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]"));
}
Set<Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>>> entrySet = processorsRegistry.entrySet();
assertThat(entrySet.size(), equalTo(2));
for (Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> entry : entrySet) {
if (entry.getKey().equals("1")) {
assertThat(entry.getValue().apply(null, null), equalTo(factory1));
} else if (entry.getKey().equals("2")) {
assertThat(entry.getValue().apply(null, null), equalTo(factory2));
} else {
fail("unexpected processor id [" + entry.getKey() + "]");
}
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument;
@Before
public void init() {
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
}
public void testEmpty() throws Exception {
CompoundProcessor processor = new CompoundProcessor();
assertThat(processor.getProcessors().isEmpty(), is(true));
assertThat(processor.getOnFailureProcessors().isEmpty(), is(true));
processor.execute(ingestDocument);
}
public void testSingleProcessor() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(1));
}
public void testSingleProcessorWithException() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
try {
compoundProcessor.execute(ingestDocument);
fail("should throw exception");
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("error"));
}
assertThat(processor.getInvokedCounter(), equalTo(1));
}
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
});
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2));
compoundProcessor.execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
}
public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
throw new RuntimeException("error");
});
TestProcessor lastProcessor = new TestProcessor(ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second"));
});
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor));
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor));
compoundProcessor.execute(ingestDocument);
assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest.processor;
package org.elasticsearch.ingest.core;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -53,7 +53,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
assertThat(val, equalTo("bar"));
}
public void testReadStringProperty_InvalidType() {
public void testReadStringPropertyInvalidType() {
try {
ConfigurationUtils.readStringProperty(config, "arr");
} catch (IllegalArgumentException e) {

View File

@ -17,8 +17,10 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

View File

@ -17,8 +17,10 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
@ -67,5 +69,4 @@ public class ValueSourceTests extends ESTestCase {
assertThat(myPreciousList.size(), equalTo(1));
assertThat(myPreciousList.get(0), equalTo("value"));
}
}

View File

@ -63,7 +63,8 @@ public class FileScriptTests extends ESTestCase {
.put("script.engine." + MockScriptEngine.NAME + ".file.aggs", false)
.put("script.engine." + MockScriptEngine.NAME + ".file.search", false)
.put("script.engine." + MockScriptEngine.NAME + ".file.mapping", false)
.put("script.engine." + MockScriptEngine.NAME + ".file.update", false).build();
.put("script.engine." + MockScriptEngine.NAME + ".file.update", false)
.put("script.engine." + MockScriptEngine.NAME + ".file.ingest", false).build();
ScriptService scriptService = makeScriptService(settings);
Script script = new Script("script1", ScriptService.ScriptType.FILE, MockScriptEngine.NAME, null);
for (ScriptContext context : ScriptContext.Standard.values()) {

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
public class ThreadPoolTests extends ESTestCase {
public void testIngestThreadPoolNotStartedWithIngestDisabled() throws Exception {
Settings settings = Settings.builder().put("name", "test").put("node.ingest", false).build();
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settings);
for (ThreadPool.Info info : threadPool.info()) {
assertThat(info.getName(), not(equalTo("ingest")));
}
} finally {
if (threadPool != null) {
terminate(threadPool);
}
}
}
public void testIngestThreadPoolStartedWithIngestEnabled() throws Exception {
Settings settings = Settings.builder().put("name", "test").put("node.ingest", true).build();
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settings);
boolean ingestFound = false;
for (ThreadPool.Info info : threadPool.info()) {
if (info.getName().equals("ingest")) {
ingestFound = true;
break;
}
}
assertThat(ingestFound, equalTo(true));
} finally {
if (threadPool != null) {
terminate(threadPool);
}
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.IngestModule;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.GeoIpProcessor;
import org.elasticsearch.ingest.processor.GrokProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.plugins.Plugin;
public class IngestPlugin extends Plugin {
public static final String NAME = "ingest";
@Override
public String name() {
return NAME;
}
@Override
public String description() {
return "Plugin that allows to plug in ingest processors";
}
public void onModule(IngestModule ingestModule) {
ingestModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
ingestModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
ingestModule.registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
ingestModule.registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
ingestModule.registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
ingestModule.registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
ingestModule.registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
ingestModule.registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
ingestModule.registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
ingestModule.registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
ingestModule.registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
ingestModule.registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
ingestModule.registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
ingestModule.registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
ingestModule.registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
}
}

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,9 +19,11 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.ArrayList;
import java.util.List;

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;

View File

@ -19,8 +19,10 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import java.util.Map;

View File

@ -31,7 +31,8 @@ import com.maxmind.geoip2.record.Subdivision;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import java.io.Closeable;
import java.io.IOException;
@ -55,8 +56,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty;
public final class GeoIpProcessor implements Processor {

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.io.BufferedReader;
import java.io.IOException;

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;
import java.util.regex.Matcher;

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.List;
import java.util.Map;

View File

@ -19,8 +19,10 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,9 +19,11 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Arrays;
import java.util.Map;

View File

@ -1,92 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
import org.elasticsearch.ingest.processor.FailProcessor;
import org.elasticsearch.ingest.processor.GeoIpProcessor;
import org.elasticsearch.ingest.processor.GrokProcessor;
import org.elasticsearch.ingest.processor.GsubProcessor;
import org.elasticsearch.ingest.processor.JoinProcessor;
import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import java.util.HashMap;
import java.util.Map;
public class IngestModule extends AbstractModule {
private final boolean ingestEnabled;
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders = new HashMap<>();
public IngestModule(boolean ingestEnabled) {
this.ingestEnabled = ingestEnabled;
}
@Override
protected void configure() {
// Even if ingest isn't enable we still need to make sure that rest requests with pipeline
// param copy the pipeline into the context, so that in IngestDisabledActionFilter
// index/bulk requests can be failed
binder().bind(IngestRestFilter.class).asEagerSingleton();
if (ingestEnabled) {
binder().bind(IngestBootstrapper.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory());
addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
}
}
}
/**
* Adds a processor factory under a specific type name.
*/
public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) {
processorFactoryProviders.put(type, processorFactoryProvider);
}
}

View File

@ -1,139 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
import org.elasticsearch.plugin.ingest.transport.IngestDisabledActionFilter;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineTransportAction;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptModule;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class IngestPlugin extends Plugin {
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
public static final String PIPELINE_ID_PARAM = "pipeline";
public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
public static final String NAME = "ingest";
public static final String NODE_INGEST_SETTING = "node.ingest";
private final Settings nodeSettings;
private final boolean ingestEnabled;
private final boolean transportClient;
public IngestPlugin(Settings nodeSettings) {
this.nodeSettings = nodeSettings;
this.ingestEnabled = nodeSettings.getAsBoolean(NODE_INGEST_SETTING, false);
this.transportClient = TransportClient.CLIENT_TYPE.equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING));
}
@Override
public String name() {
return NAME;
}
@Override
public String description() {
return "Plugin that allows to configure pipelines to preprocess documents before indexing";
}
@Override
public Collection<Module> nodeModules() {
if (transportClient) {
return Collections.emptyList();
} else {
return Collections.singletonList(new IngestModule(ingestEnabled));
}
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (transportClient|| ingestEnabled == false) {
return Collections.emptyList();
} else {
return Collections.singletonList(IngestBootstrapper.class);
}
}
@Override
public Settings additionalSettings() {
return settingsBuilder()
.put(PipelineExecutionService.additionalSettings(nodeSettings))
.build();
}
public void onModule(ActionModule module) {
if (transportClient == false) {
if (ingestEnabled) {
module.registerFilter(IngestActionFilter.class);
} else {
module.registerFilter(IngestDisabledActionFilter.class);
}
}
if (ingestEnabled) {
module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
module.registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
}
}
public void onModule(NetworkModule networkModule) {
if (transportClient) {
return;
}
if (ingestEnabled) {
networkModule.registerRestHandler(RestPutPipelineAction.class);
networkModule.registerRestHandler(RestGetPipelineAction.class);
networkModule.registerRestHandler(RestDeletePipelineAction.class);
networkModule.registerRestHandler(RestSimulatePipelineAction.class);
} else {
networkModule.registerRestHandler(RestIngestDisabledAction.class);
}
}
public void onModule(ScriptModule module) {
module.registerScriptContext(InternalTemplateService.INGEST_SCRIPT_CONTEXT);
}
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.processor.Processor;
/**
* The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some
* processors rely on reading files from the config directory. We can't add Environment as a constructor parameter,
* so we need some code that provides the physical location of the configuration directory to the processor factories
* that need this and this is what this processor factory provider does.
*/
@FunctionalInterface
interface ProcessorFactoryProvider {
Processor.Factory get(Environment environment, TemplateService templateService);
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.BytesRestResponse;
public class RestIngestDisabledAction extends BaseRestHandler {
@Inject
public RestIngestDisabledAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.DELETE, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.PUT, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
channel.sendResponse(new BytesRestResponse(channel, new IllegalArgumentException("ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used")));
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.ingest;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.RestTestCandidate;

View File

@ -19,7 +19,8 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -19,13 +19,12 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;

View File

@ -1,163 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument;
@Before
public void init() {
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
}
public void testEmpty() throws Exception {
CompoundProcessor processor = new CompoundProcessor();
assertThat(processor.getProcessors().isEmpty(), is(true));
assertThat(processor.getOnFailureProcessors().isEmpty(), is(true));
processor.execute(ingestDocument);
}
public void testSingleProcessor() throws Exception {
Processor processor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument);
verify(processor, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithException() throws Exception {
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("failed_processor");
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
try {
compoundProcessor.execute(ingestDocument);
fail("should throw exception");
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("error"));
}
verify(processor, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
Exception error = new RuntimeException("error");
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("first");
doThrow(error).doNothing().when(processor).execute(ingestDocument);
Processor processorNext = mock(Processor.class);
Answer checkMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
return null;
};
doAnswer(checkMetadataAnswer).when(processorNext).execute(ingestDocument);
CompoundProcessor compoundProcessor = spy(new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext)));
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext));
compoundProcessor.execute(ingestDocument);
verify(compoundProcessor).executeOnFailure(ingestDocument, error, "first");
verify(processor, times(1)).execute(ingestDocument);
verify(processorNext, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithNestedFailures() throws Exception {
Exception error = new RuntimeException("error");
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("first");
doThrow(error).doNothing().when(processor).execute(ingestDocument);
Processor processorToFail = mock(Processor.class);
Answer checkMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
return null;
};
doAnswer(checkMetadataAnswer).when(processorToFail).execute(ingestDocument);
when(processorToFail.getType()).thenReturn("second");
doThrow(error).doNothing().when(processorToFail).execute(ingestDocument);
Processor lastProcessor = mock(Processor.class);
Answer checkLastMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second"));
return null;
};
doAnswer(checkLastMetadataAnswer).when(lastProcessor).execute(ingestDocument);
CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor));
CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor);
CompoundProcessor innerCompoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(compoundOnFailProcessor));
CompoundProcessor compoundProcessor = spy(innerCompoundProcessor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor));
compoundProcessor.execute(ingestDocument);
verify(processor, times(1)).execute(ingestDocument);
verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument, error, "first");
verify(compoundOnFailProcessor, times(1)).execute(ingestDocument);
verify(processorToFail, times(1)).execute(ingestDocument);
verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument, error, "second");
verify(lastProcessor, times(1)).execute(ingestDocument);
}
}

View File

@ -19,10 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;

View File

@ -19,7 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTime;

Some files were not shown because too many files have changed in this diff Show More