wip: move all the ingest infra to core

This commit is contained in:
javanna 2016-01-06 19:10:43 +01:00 committed by Luca Cavanna
parent da3f460bd1
commit 9079a7e891
110 changed files with 490 additions and 657 deletions

View File

@ -155,6 +155,12 @@ import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptAction;
import org.elasticsearch.action.indexedscripts.get.TransportGetIndexedScriptAction;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptAction;
import org.elasticsearch.action.indexedscripts.put.TransportPutIndexedScriptAction;
import org.elasticsearch.action.ingest.delete.DeletePipelineAction;
import org.elasticsearch.action.ingest.delete.DeletePipelineTransportAction;
import org.elasticsearch.action.ingest.get.GetPipelineAction;
import org.elasticsearch.action.ingest.get.GetPipelineTransportAction;
import org.elasticsearch.action.ingest.put.PutPipelineAction;
import org.elasticsearch.action.ingest.put.PutPipelineTransportAction;
import org.elasticsearch.action.percolate.MultiPercolateAction;
import org.elasticsearch.action.percolate.PercolateAction;
import org.elasticsearch.action.percolate.TransportMultiPercolateAction;
@ -192,6 +198,11 @@ import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.action.ingest.IngestDisabledActionFilter;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineAction;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineTransportAction;
import java.util.ArrayList;
import java.util.HashMap;
@ -220,9 +231,11 @@ public class ActionModule extends AbstractModule {
}
private final boolean ingestEnabled;
private final boolean proxy;
public ActionModule(boolean proxy) {
public ActionModule(Settings settings, boolean proxy) {
this.ingestEnabled = settings.getAsBoolean("node.ingest", false);
this.proxy = proxy;
}
@ -349,6 +362,11 @@ public class ActionModule extends AbstractModule {
registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);
registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
// register Name -> GenericAction Map that can be injected to instances.
MapBinder<String, GenericAction> actionsBinder
= MapBinder.newMapBinder(binder(), String.class, GenericAction.class);
@ -359,6 +377,11 @@ public class ActionModule extends AbstractModule {
// register GenericAction -> transportAction Map that can be injected to instances.
// also register any supporting classes
if (!proxy) {
if (ingestEnabled) {
registerFilter(IngestActionFilter.class);
} else {
registerFilter(IngestDisabledActionFilter.class);
}
bind(TransportLivenessAction.class).asEagerSingleton();
MapBinder<GenericAction, TransportAction> transportActionsBinder
= MapBinder.newMapBinder(binder(), GenericAction.class, TransportAction.class);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -31,9 +31,9 @@ import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.tasks.Task;
import java.util.ArrayList;
@ -54,9 +54,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
@Override
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
String pipelineId = request.getFromContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId == null) {
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
pipelineId = request.getHeader(ConfigurationUtils.PIPELINE_ID_PARAM);
if (pipelineId == null) {
chain.proceed(task, action, request, listener);
return;
@ -84,7 +84,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
// The IndexRequest has the same type on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence this check
if (indexRequest.hasHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED)) {
if (indexRequest.hasHeader(ConfigurationUtils.PIPELINE_ALREADY_PROCESSED)) {
chain.proceed(task, action, indexRequest, listener);
return;
}
@ -92,7 +92,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
logger.error("failed to execute pipeline [{}]", t, pipelineId);
listener.onFailure(t);
}, success -> {
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(task, action, indexRequest, listener);
});
}

View File

@ -16,25 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.ingest.core.ConfigurationUtils;
public final class IngestDisabledActionFilter implements ActionFilter {
@Override
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
String pipelineId = request.getFromContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId != null) {
failRequest(pipelineId);
}
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
pipelineId = request.getHeader(ConfigurationUtils.PIPELINE_ID_PARAM);
if (pipelineId != null) {
failRequest(pipelineId);
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest.delete;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.delete.DeleteResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest.delete;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest.delete;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.delete;
package org.elasticsearch.action.ingest.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteResponse;
@ -26,8 +26,8 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest.get;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest.get;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest.get;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,14 +17,14 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest.get;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.ingest.PipelineDefinition;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.get;
package org.elasticsearch.action.ingest.get;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
@ -25,9 +25,9 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineDefinition;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest.put;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.index.IndexResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest.put;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest.put;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.put;
package org.elasticsearch.action.ingest.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
@ -26,11 +26,12 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, IndexResponse> {
private final PipelineStore pipelineStore;

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.reload;
package org.elasticsearch.action.ingest.reload;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;

View File

@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import java.io.IOException;
import java.util.Collections;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -17,12 +17,12 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,17 +17,17 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.PipelineStore;
import java.io.IOException;
import java.util.ArrayList;
@ -36,7 +36,7 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.ingest.IngestDocument.MetaData;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData;
public class SimulatePipelineRequest extends ActionRequest {

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
@ -26,8 +26,8 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
@ -25,7 +25,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import java.io.IOException;
import java.util.Collections;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -25,7 +25,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import java.io.IOException;
import java.util.Collections;

View File

@ -147,7 +147,7 @@ public class TransportClient extends AbstractClient {
// noop
}
});
modules.add(new ActionModule(true));
modules.add(new ActionModule(this.settings, true));
modules.add(new CircuitBreakerModule(this.settings));
pluginsService.processModules(modules);

View File

@ -116,6 +116,10 @@ import org.elasticsearch.rest.action.get.RestGetSourceAction;
import org.elasticsearch.rest.action.get.RestHeadAction;
import org.elasticsearch.rest.action.get.RestMultiGetAction;
import org.elasticsearch.rest.action.index.RestIndexAction;
import org.elasticsearch.rest.action.ingest.RestDeletePipelineAction;
import org.elasticsearch.rest.action.ingest.RestGetPipelineAction;
import org.elasticsearch.rest.action.ingest.RestPutPipelineAction;
import org.elasticsearch.rest.action.ingest.RestSimulatePipelineAction;
import org.elasticsearch.rest.action.main.RestMainAction;
import org.elasticsearch.rest.action.percolate.RestMultiPercolateAction;
import org.elasticsearch.rest.action.percolate.RestPercolateAction;
@ -263,7 +267,13 @@ public class NetworkModule extends AbstractModule {
RestCatAction.class,
// Tasks API
RestListTasksAction.class
RestListTasksAction.class,
// Ingest API
RestPutPipelineAction.class,
RestGetPipelineAction.class,
RestDeletePipelineAction.class,
RestSimulatePipelineAction.class
);
private static final List<Class<? extends AbstractCatAction>> builtinCatHandlers = Arrays.asList(

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
@ -36,7 +36,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -164,7 +163,7 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
void installIngestIndexTemplate() throws IOException {
logger.debug("installing .ingest index template...");
try (InputStream is = IngestBootstrapper.class.getResourceAsStream("/ingest.json")) {
try (InputStream is = IngestBootstrapper.class.getResourceAsStream("ingest.json")) {
final byte[] template;
try (BytesStreamOutput out = new BytesStreamOutput()) {
Streams.copy(is, out);

View File

@ -21,24 +21,29 @@ package org.elasticsearch.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.rest.action.ingest.IngestRestFilter;
import java.util.function.BiFunction;
/**
* Registry for processor factories
* @see org.elasticsearch.ingest.Processor.Factory
* @see Processor.Factory
*/
public class ProcessorsModule extends AbstractModule {
public class IngestModule extends AbstractModule {
private final ProcessorsRegistry processorsRegistry;
public ProcessorsModule() {
public IngestModule() {
this.processorsRegistry = new ProcessorsRegistry();
}
@Override
protected void configure() {
binder().bind(IngestRestFilter.class).asEagerSingleton();
bind(ProcessorsRegistry.class).toInstance(processorsRegistry);
binder().bind(IngestBootstrapper.class).asEagerSingleton();
}
/**

View File

@ -17,10 +17,10 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
@ -30,13 +30,11 @@ import org.elasticsearch.script.ScriptService;
import java.util.Collections;
import java.util.Map;
class InternalTemplateService implements TemplateService {
public static final ScriptContext.Plugin INGEST_SCRIPT_CONTEXT = new ScriptContext.Plugin("elasticsearch-ingest", "ingest");
public class InternalTemplateService implements TemplateService {
private final ScriptService scriptService;
InternalTemplateService(ScriptService scriptService) {
public InternalTemplateService(ScriptService scriptService) {
this.scriptService = scriptService;
}
@ -48,7 +46,7 @@ class InternalTemplateService implements TemplateService {
Script script = new Script(template, ScriptService.ScriptType.INLINE, "mustache", Collections.emptyMap());
CompiledScript compiledScript = scriptService.compile(
script,
INGEST_SCRIPT_CONTEXT,
ScriptContext.Standard.INGEST,
null /* we can supply null here, because ingest doesn't use indexed scripts */,
Collections.emptyMap()
);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -26,7 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.core.Pipeline;
import java.io.IOException;

View File

@ -17,14 +17,12 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
@ -32,8 +30,6 @@ import java.util.function.Consumer;
public class PipelineExecutionService {
static final String THREAD_POOL_NAME = IngestPlugin.NAME;
private final PipelineStore store;
private final ThreadPool threadPool;
@ -44,7 +40,7 @@ public class PipelineExecutionService {
public void execute(IndexRequest request, String pipelineId, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(pipelineId);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
try {
innerExecute(request, pipeline);
completionHandler.accept(true);
@ -57,7 +53,7 @@ public class PipelineExecutionService {
public void execute(Iterable<ActionRequest> actionRequests, String pipelineId,
Consumer<Throwable> itemFailureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(pipelineId);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
for (ActionRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest) == false) {
continue;
@ -108,20 +104,4 @@ public class PipelineExecutionService {
}
return pipeline;
}
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// the TP is already configured in the node settings
// no need for additional settings
return Settings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings);
return Settings.builder()
.put("threadpool." + THREAD_POOL_NAME + ".type", "fixed")
.put("threadpool." + THREAD_POOL_NAME + ".size", availableProcessors)
.put("threadpool." + THREAD_POOL_NAME + ".queue_size", 200)
.build();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
@ -40,13 +40,12 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.plugin.ingest.transport.reload.ReloadPipelinesAction;
import org.elasticsearch.action.ingest.delete.DeletePipelineRequest;
import org.elasticsearch.action.ingest.put.PutPipelineRequest;
import org.elasticsearch.action.ingest.reload.ReloadPipelinesAction;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import java.util.HashMap;
import java.util.Map;

View File

@ -18,7 +18,7 @@
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.Arrays;
import java.util.Collections;

View File

@ -17,13 +17,17 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.List;
import java.util.Map;
public final class ConfigurationUtils {
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
public static final String PIPELINE_ID_PARAM = "pipeline";
public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
private ConfigurationUtils() {
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;

View File

@ -18,7 +18,7 @@
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.ArrayList;
import java.util.Arrays;

View File

@ -18,7 +18,7 @@
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.Map;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.Map;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import java.util.ArrayList;
import java.util.HashMap;

View File

@ -72,7 +72,7 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.ingest.ProcessorsModule;
import org.elasticsearch.ingest.IngestModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
@ -189,7 +189,7 @@ public class Node implements Releasable {
modules.add(new ClusterModule(this.settings));
modules.add(new IndicesModule());
modules.add(new SearchModule());
modules.add(new ActionModule(false));
modules.add(new ActionModule(this.settings, false));
modules.add(new GatewayModule(settings));
modules.add(new NodeClientModule());
modules.add(new PercolatorModule());
@ -197,7 +197,7 @@ public class Node implements Releasable {
modules.add(new RepositoriesModule());
modules.add(new TribeModule());
modules.add(new AnalysisModule(environment));
modules.add(new ProcessorsModule());
modules.add(new IngestModule());
pluginsService.processModules(modules);

View File

@ -17,10 +17,10 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestFilter;
@ -36,8 +36,8 @@ public class IngestRestFilter extends RestFilter {
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
if (request.hasParam(IngestPlugin.PIPELINE_ID_PARAM)) {
request.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(IngestPlugin.PIPELINE_ID_PARAM));
if (request.hasParam(ConfigurationUtils.PIPELINE_ID_PARAM)) {
request.putInContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(ConfigurationUtils.PIPELINE_ID_PARAM));
}
filterChain.continueProcessing(request, channel);
}

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
import org.elasticsearch.action.ingest.delete.DeletePipelineAction;
import org.elasticsearch.action.ingest.delete.DeletePipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -17,14 +17,14 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest;
import org.elasticsearch.action.ingest.get.GetPipelineAction;
import org.elasticsearch.action.ingest.get.GetPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.action.ingest.put.PutPipelineAction;
import org.elasticsearch.action.ingest.put.PutPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -17,13 +17,13 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineAction;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;

View File

@ -37,7 +37,7 @@ public interface ScriptContext {
*/
enum Standard implements ScriptContext {
AGGS("aggs"), SEARCH("search"), UPDATE("update");
AGGS("aggs"), SEARCH("search"), UPDATE("update"), INGEST("ingest");
private final String key;

View File

@ -87,6 +87,7 @@ public class ThreadPool extends AbstractComponent {
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
public static final String INGEST = "ingest";
}
public enum ThreadPoolType {
@ -145,6 +146,7 @@ public class ThreadPool extends AbstractComponent {
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
map.put(Names.INGEST, ThreadPoolType.FIXED);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}
@ -234,6 +236,7 @@ public class ThreadPool extends AbstractComponent {
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INGEST).size(availableProcessors).queueSize(200));
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);

View File

@ -1,4 +1,4 @@
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
/*
* Licensed to Elasticsearch under one or more contributor

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -29,32 +29,32 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.Matchers;
import org.mockito.stubbing.Answer;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.BulkRequestModifier;
import static org.elasticsearch.action.ingest.IngestActionFilter.BulkRequestModifier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -89,13 +89,13 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(executionService).execute(Matchers.any(IndexRequest.class), Matchers.eq("_id"), Matchers.any(Consumer.class), Matchers.any(Consumer.class));
verifyZeroInteractions(actionFilterChain);
}
@ -103,13 +103,13 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
indexRequest.putInContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(executionService).execute(Matchers.any(IndexRequest.class), Matchers.eq("_id"), Matchers.any(Consumer.class), Matchers.any(Consumer.class));
verifyZeroInteractions(actionFilterChain);
}
@ -117,8 +117,8 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ALREADY_PROCESSED, true);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -132,7 +132,7 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -154,23 +154,20 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
RuntimeException exception = new RuntimeException();
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Consumer<Throwable> handler = (Consumer) invocationOnMock.getArguments()[2];
handler.accept(exception);
return null;
}
Answer answer = invocationOnMock -> {
Consumer<Throwable> handler = (Consumer) invocationOnMock.getArguments()[2];
handler.accept(exception);
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(executionService).execute(Matchers.any(IndexRequest.class), Matchers.eq("_id"), Matchers.any(Consumer.class), Matchers.any(Consumer.class));
verify(actionListener).onFailure(exception);
verifyZeroInteractions(actionFilterChain);
}
@ -199,7 +196,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
bulkRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
if (rarely()) {

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.reload;
package org.elasticsearch.action.ingest.reload;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
@ -27,20 +27,19 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.Matchers;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -83,7 +82,7 @@ public class ReloadPipelinesActionTests extends ESTestCase {
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
}
return mock;
}).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any());
}).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any());
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(true)));
}
@ -115,7 +114,7 @@ public class ReloadPipelinesActionTests extends ESTestCase {
}
}
return mock;
}).when(transportService).sendRequest(any(), eq(ReloadPipelinesAction.ACTION_NAME), any(), any());
}).when(transportService).sendRequest(Matchers.any(), Matchers.eq(ReloadPipelinesAction.ACTION_NAME), Matchers.any(), Matchers.any());
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false)));
}

View File

@ -17,11 +17,11 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -17,13 +17,14 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
@ -34,18 +35,11 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class SimulateExecutionServiceTests extends ESTestCase {
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private Pipeline pipeline;
private CompoundProcessor processor;
private IngestDocument ingestDocument;
@Before
@ -56,9 +50,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
.build()
);
executionService = new SimulateExecutionService(threadPool);
processor = mock(CompoundProcessor.class);
when(processor.getType()).thenReturn("mock");
pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
}
@ -68,8 +59,10 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
verify(processor, times(2)).execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
@ -89,8 +82,10 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
verify(processor, times(2)).execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
assertThat(simulateDocumentSimpleResult.getIngestDocument(), equalTo(ingestDocument));
@ -98,10 +93,12 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).doNothing().when(processor).execute(ingestDocument);
TestProcessor processor1 = new TestProcessor("mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
verify(processor, times(2)).execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
@ -114,15 +111,13 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocument));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
}
public void testExecuteItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).when(processor).execute(ingestDocument);
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
verify(processor, times(1)).execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
assertThat(simulateDocumentSimpleResult.getIngestDocument(), nullValue());

View File

@ -17,29 +17,29 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.ingest.IngestDocument.MetaData.ID;
import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX;
import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE;
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
import static org.elasticsearch.action.ingest.simulate.SimulatePipelineRequest.Fields;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.ID;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.INDEX;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.TYPE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
@ -51,8 +51,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
@Before
public void init() throws IOException {
CompoundProcessor pipelineCompoundProcessor = mock(CompoundProcessor.class);
when(pipelineCompoundProcessor.getProcessors()).thenReturn(Arrays.asList(mock(Processor.class)));
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mock_processor", mock(Processor.Factory.class));

View File

@ -17,11 +17,11 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -17,11 +17,11 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -17,11 +17,11 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
package org.elasticsearch.action.ingest.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
@ -46,6 +46,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.Matchers;
import java.util.ArrayList;
import java.util.Collections;
@ -54,8 +55,6 @@ import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -71,7 +70,7 @@ public class IngestBootstrapperTests extends ESTestCase {
@Before
public void init() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(any())).thenReturn(Runnable::run);
when(threadPool.executor(Matchers.any())).thenReturn(Runnable::run);
ClusterService clusterService = mock(ClusterService.class);
store = mock(PipelineStore.class);
when(store.isStarted()).thenReturn(false);
@ -85,8 +84,8 @@ public class IngestBootstrapperTests extends ESTestCase {
TransportService transportService = mock(TransportService.class);
ClusterService clusterService = mock(ClusterService.class);
when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
when(client.searchScroll(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
when(client.search(Matchers.any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
when(client.searchScroll(Matchers.any())).thenReturn(PipelineStoreTests.expectedSearchReponse(Collections.emptyList()));
Settings settings = Settings.EMPTY;
PipelineStore store = new PipelineStore(settings, clusterService, transportService);
IngestBootstrapper bootstrapper = new IngestBootstrapper(
@ -98,8 +97,8 @@ public class IngestBootstrapperTests extends ESTestCase {
hits.add(new InternalSearchHit(0, "1", new Text("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(client.search(any())).thenReturn(PipelineStoreTests.expectedSearchReponse(hits));
when(client.get(any())).thenReturn(PipelineStoreTests.expectedGetResponse(true));
when(client.search(Matchers.any())).thenReturn(PipelineStoreTests.expectedSearchReponse(hits));
when(client.get(Matchers.any())).thenReturn(PipelineStoreTests.expectedGetResponse(true));
try {
store.get("1");
@ -146,7 +145,7 @@ public class IngestBootstrapperTests extends ESTestCase {
ClusterState cs = csBuilder.metaData(MetaData.builder()).build();
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, never()).stop(anyString());
verify(store, never()).stop(Matchers.anyString());
}
public void testPipelineStoreBootstrappingGlobalStateNoMasterBlock() throws Exception {
@ -161,13 +160,13 @@ public class IngestBootstrapperTests extends ESTestCase {
// We're not started and there is a no master block, doing nothing:
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, never()).stop(anyString());
verify(store, never()).stop(Matchers.anyString());
// We're started and there is a no master block, so we stop the store:
when(store.isStarted()).thenReturn(true);
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, times(1)).stop(anyString());
verify(store, times(1)).stop(Matchers.anyString());
}
public void testPipelineStoreBootstrappingNoIngestIndex() throws Exception {
@ -203,13 +202,13 @@ public class IngestBootstrapperTests extends ESTestCase {
// We're not running and the cluster state isn't ready, so we don't start.
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, never()).stop(anyString());
verify(store, never()).stop(Matchers.anyString());
// We're running and the cluster state indicates that all our shards are unassigned, so we stop.
when(store.isStarted()).thenReturn(true);
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, never()).start();
verify(store, times(1)).stop(anyString());
verify(store, times(1)).stop(Matchers.anyString());
}
public void testPipelineStoreBootstrappingIngestIndexShardsStarted() throws Exception {
@ -236,13 +235,13 @@ public class IngestBootstrapperTests extends ESTestCase {
// We're not running and the cluster state is ready, so we start.
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, times(1)).start();
verify(store, never()).stop(anyString());
verify(store, never()).stop(Matchers.anyString());
// We're running and the cluster state is good, so we do nothing.
when(store.isStarted()).thenReturn(true);
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, times(1)).start();
verify(store, never()).stop(anyString());
verify(store, never()).stop(Matchers.anyString());
}
public void testPipelineStoreBootstrappingFailure() throws Exception {
@ -270,7 +269,7 @@ public class IngestBootstrapperTests extends ESTestCase {
doThrow(new RuntimeException()).doNothing().when(store).start();
bootstrapper.clusterChanged(new ClusterChangedEvent("test", cs, cs));
verify(store, times(2)).start();
verify(store, never()).stop(anyString());
verify(store, never()).stop(Matchers.anyString());
}
}

View File

@ -25,35 +25,35 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.delete.DeletePipelineAction;
import org.elasticsearch.action.ingest.delete.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.get.GetPipelineAction;
import org.elasticsearch.action.ingest.get.GetPipelineRequestBuilder;
import org.elasticsearch.action.ingest.get.GetPipelineResponse;
import org.elasticsearch.action.ingest.put.PutPipelineAction;
import org.elasticsearch.action.ingest.put.PutPipelineRequestBuilder;
import org.elasticsearch.action.ingest.simulate.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineAction;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineRequestBuilder;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateDocumentSimpleResult;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
@ -73,7 +73,7 @@ public class IngestClientIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(IngestPlugin.NODE_INGEST_SETTING, true)
.put("node.ingest", true)
.build();
}
@ -81,7 +81,7 @@ public class IngestClientIT extends ESIntegTestCase {
protected Settings externalClusterClientSettings() {
return Settings.builder()
.put(super.transportClientSettings())
.put(IngestPlugin.NODE_INGEST_SETTING, true)
.put("node.ingest", true)
.build();
}
@ -92,9 +92,7 @@ public class IngestClientIT extends ESIntegTestCase {
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("grok")
.field("field", "field1")
.field("pattern", "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>")
.startObject("test")
.endObject()
.endObject()
.endArray()
@ -128,35 +126,9 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(response.getResults().size(), equalTo(1));
assertThat(response.getResults().get(0), instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
assertThat(simulateDocumentSimpleResult.getIngestDocument(), nullValue());
assertThat(simulateDocumentSimpleResult.getFailure(), notNullValue());
response = new SimulatePipelineRequestBuilder(client(), SimulatePipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
.startArray("docs")
.startObject()
.field("_index", "index")
.field("_type", "type")
.field("_id", "id")
.startObject("_source")
.field("field1", "123.42 400 <foo>")
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
assertThat(response.isVerbose(), equalTo(false));
assertThat(response.getPipelineId(), equalTo("_id"));
assertThat(response.getResults().size(), equalTo(1));
assertThat(response.getResults().get(0), instanceOf(SimulateDocumentSimpleResult.class));
simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
Map<String, Object> source = new HashMap<>();
source.put("field1", "123.42 400 <foo>");
source.put("val", 123.42f);
source.put("status", 400);
source.put("msg", "foo");
source.put("foo", "bar");
source.put("processed", true);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source);
assertThat(simulateDocumentSimpleResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
@ -182,7 +154,7 @@ public class IngestClientIT extends ESIntegTestCase {
int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
bulkRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
if (i % 2 == 0) {
@ -240,7 +212,7 @@ public class IngestClientIT extends ESIntegTestCase {
assertAcked(putMappingResponse);
client().prepareIndex("test", "type", "1").setSource("field1", "123.42 400 <foo>")
.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id")
.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id")
.get();
Map<String, Object> doc = client().prepareGet("test", "type", "1")
@ -251,7 +223,7 @@ public class IngestClientIT extends ESIntegTestCase {
client().prepareBulk().add(
client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 <foo>")
).putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id").get();
).putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id").get();
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
assertThat(doc.get("val"), equalTo(123.42));
assertThat(doc.get("status"), equalTo(400));
@ -274,4 +246,23 @@ public class IngestClientIT extends ESIntegTestCase {
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Collections.emptyList();
}
public static class IngestPlugin extends Plugin {
@Override
public String name() {
return "ingest";
}
@Override
public String description() {
return "ingest mock";
}
public void onModule(IngestModule ingestModule) {
ingestModule.addProcessor("test", (environment, templateService) -> config ->
new TestProcessor("test", ingestDocument -> ingestDocument.setFieldValue("processed", true))
);
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -25,13 +25,15 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.mockito.Mockito;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class IngestTemplateTests extends ESSingleNodeTestCase {
@ -44,11 +46,11 @@ public class IngestTemplateTests extends ESSingleNodeTestCase {
@Before
public void init() {
ThreadPool threadPool = Mockito.mock(ThreadPool.class);
Mockito.when(threadPool.executor(Mockito.anyString())).thenReturn(Runnable::run);
Environment environment = Mockito.mock(Environment.class);
ClusterService clusterService = Mockito.mock(ClusterService.class);
TransportService transportService = Mockito.mock(TransportService.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(Runnable::run);
Environment environment = mock(Environment.class);
ClusterService clusterService = mock(ClusterService.class);
TransportService transportService = mock(TransportService.class);
bootstrapper = new IngestBootstrapper(
Settings.EMPTY, threadPool, environment, clusterService, transportService, new ProcessorsRegistry()
);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
@ -26,12 +26,10 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
@ -39,17 +37,12 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -67,14 +60,16 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void setup() {
store = mock(PipelineStore.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(Runnable::run);
when(threadPool.executor(Matchers.anyString())).thenReturn(Runnable::run);
executionService = new PipelineExecutionService(store, threadPool);
}
public void testExecutePipelineDoesNotExist() {
when(store.get("_id")).thenReturn(null);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
try {
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
@ -82,8 +77,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist"));
}
verify(failureHandler, never()).accept(any(Throwable.class));
verify(completionHandler, never()).accept(anyBoolean());
verify(failureHandler, never()).accept(Matchers.any(Throwable.class));
verify(completionHandler, never()).accept(Matchers.anyBoolean());
}
public void testExecuteSuccess() throws Exception {
@ -91,12 +86,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
//TODO we remove metadata, this check is not valid anymore, what do we replace it with?
//verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, never()).accept(any());
verify(failureHandler, never()).accept(Matchers.any());
verify(completionHandler, times(1)).accept(true);
}
@ -113,15 +108,17 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}
return null;
}).when(processor).execute(any());
}).when(processor).execute(Matchers.any());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(any());
verify(failureHandler, never()).accept(any());
verify(processor).execute(Matchers.any());
verify(failureHandler, never()).accept(Matchers.any());
verify(completionHandler, times(1)).accept(true);
assertThat(indexRequest.index(), equalTo("update_index"));
@ -138,12 +135,14 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
verify(failureHandler, times(1)).accept(Matchers.any(RuntimeException.class));
verify(completionHandler, never()).accept(Matchers.anyBoolean());
}
public void testExecuteSuccessWithOnFailure() throws Exception {
@ -153,12 +152,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
//TODO we remove metadata, this check is not valid anymore, what do we replace it with?
//verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, never()).accept(any(RuntimeException.class));
verify(failureHandler, never()).accept(Matchers.any(RuntimeException.class));
verify(completionHandler, times(1)).accept(true);
}
@ -170,12 +169,14 @@ public class PipelineExecutionServiceTests extends ESTestCase {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
verify(failureHandler, times(1)).accept(Matchers.any(RuntimeException.class));
verify(completionHandler, never()).accept(Matchers.anyBoolean());
}
public void testExecuteFailureWithNestedOnFailure() throws Exception {
@ -183,66 +184,65 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor onFailureProcessor = mock(Processor.class);
Processor onFailureOnFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(Arrays.asList(onFailureProcessor),Arrays.asList(onFailureOnFailureProcessor))));
Collections.singletonList(new CompoundProcessor(Collections.singletonList(onFailureProcessor), Collections.singletonList(onFailureOnFailureProcessor))));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
verify(failureHandler, times(1)).accept(Matchers.any(RuntimeException.class));
verify(completionHandler, never()).accept(Matchers.anyBoolean());
}
@SuppressWarnings("unchecked")
public void testExecuteTTL() throws Exception {
// test with valid ttl
SetProcessor.Factory metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance());
Map<String, Object> config = new HashMap<>();
config.put("field", "_ttl");
config.put("value", "5d");
Processor processor = metaProcessorFactory.create(config);
public void testExecuteSetTTL() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "5d"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl")));
verify(failureHandler, never()).accept(any());
verify(failureHandler, never()).accept(Matchers.any());
verify(completionHandler, times(1)).accept(true);
}
// test with invalid ttl
metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance());
config = new HashMap<>();
config.put("field", "_ttl");
config.put("value", "abc");
processor = metaProcessorFactory.create(config);
public void testExecuteSetInvalidTTL() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "abc"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
failureHandler = mock(Consumer.class);
completionHandler = mock(Consumer.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(failureHandler, times(1)).accept(any(ElasticsearchParseException.class));
verify(completionHandler, never()).accept(anyBoolean());
verify(failureHandler, times(1)).accept(Matchers.any(ElasticsearchParseException.class));
verify(completionHandler, never()).accept(Matchers.anyBoolean());
}
// test with provided ttl
public void testExecuteProvidedTTL() throws Exception {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", mock(CompoundProcessor.class)));
indexRequest = new IndexRequest("_index", "_type", "_id")
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.ttl(1000L);
failureHandler = mock(Consumer.class);
completionHandler = mock(Consumer.class);
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
assertThat(indexRequest.ttl(), equalTo(new TimeValue(1000L)));
verify(failureHandler, never()).accept(any());
verify(failureHandler, never()).accept(Matchers.any());
verify(completionHandler, times(1)).accept(true);
}
@ -272,7 +272,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
CompoundProcessor processor = mock(CompoundProcessor.class);
Exception error = new RuntimeException();
doThrow(error).when(processor).execute(any());
doThrow(error).when(processor).execute(Matchers.any());
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class);
@ -296,11 +296,13 @@ public class PipelineExecutionServiceTests extends ESTestCase {
String pipelineId = "_id";
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor()));
@SuppressWarnings("unchecked")
Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(bulkRequest.requests(), pipelineId, requestItemErrorHandler, completionHandler);
verify(requestItemErrorHandler, never()).accept(any());
verify(requestItemErrorHandler, never()).accept(Matchers.any());
verify(completionHandler, times(1)).accept(true);
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.ingest;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.get.GetRequest;
@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -64,8 +63,8 @@ public class PipelineStoreTests extends ESTestCase {
TransportService transportService = mock(TransportService.class);
client = mock(Client.class);
when(client.search(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
when(client.search(Matchers.any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
when(client.searchScroll(Matchers.any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
store = new PipelineStore(settings, clusterService, transportService);
store.setClient(client);
store.start();
@ -77,15 +76,15 @@ public class PipelineStoreTests extends ESTestCase {
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
when(client.get(any())).thenReturn(expectedGetResponse(true));
when(client.search(Matchers.any())).thenReturn(expectedSearchReponse(hits));
when(client.get(Matchers.any())).thenReturn(expectedGetResponse(true));
assertThat(store.get("1"), nullValue());
store.updatePipelines();
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
when(client.get(any())).thenReturn(expectedGetResponse(true));
when(client.get(Matchers.any())).thenReturn(expectedGetResponse(true));
hits.add(new InternalSearchHit(0, "2", new Text("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
);
@ -109,7 +108,7 @@ public class PipelineStoreTests extends ESTestCase {
hits.add(new InternalSearchHit(0, "foo", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "bar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "foobar", new Text("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
when(client.search(Matchers.any())).thenReturn(expectedSearchReponse(hits));
store.updatePipelines();
List<PipelineDefinition> result = store.getReference("foo");

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.test.ESTestCase;
import java.util.Map;

View File

@ -17,8 +17,9 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

View File

@ -17,8 +17,10 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

View File

@ -17,8 +17,10 @@
* under the License.
*/
package org.elasticsearch.ingest;
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,11 +19,11 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.ArrayList;
import java.util.List;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;

View File

@ -19,10 +19,10 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import java.util.Map;

View File

@ -31,8 +31,8 @@ import com.maxmind.geoip2.record.Subdivision;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import java.io.Closeable;
import java.io.IOException;
@ -56,8 +56,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty;
public final class GeoIpProcessor implements Processor {

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.io.BufferedReader;
import java.io.IOException;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;
import java.util.regex.Matcher;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.List;
import java.util.Map;

View File

@ -19,10 +19,10 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,11 +19,11 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Map;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
import java.util.Arrays;
import java.util.Map;

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
public class IngestModule extends AbstractModule {
private final boolean ingestEnabled;
public IngestModule(boolean ingestEnabled) {
this.ingestEnabled = ingestEnabled;
}
@Override
protected void configure() {
// Even if ingest isn't enabled we still need to make sure that rest requests with pipeline
// param copy the pipeline into the context, so that in IngestDisabledActionFilter
// index/bulk requests can be failed
binder().bind(IngestRestFilter.class).asEagerSingleton();
if (ingestEnabled) {
binder().bind(IngestBootstrapper.class).asEagerSingleton();
}
}
}

View File

@ -19,14 +19,8 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.ProcessorsModule;
import org.elasticsearch.ingest.IngestModule;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.ConvertProcessor;
import org.elasticsearch.ingest.processor.DateProcessor;
@ -42,45 +36,16 @@ import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction;
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
import org.elasticsearch.plugin.ingest.transport.IngestDisabledActionFilter;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineTransportAction;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptModule;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class IngestPlugin extends Plugin {
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
public static final String PIPELINE_ID_PARAM = "pipeline";
public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
public static final String NAME = "ingest";
public static final String NODE_INGEST_SETTING = "node.ingest";
private final Settings nodeSettings;
private final boolean ingestEnabled;
private final boolean transportClient;
public IngestPlugin(Settings nodeSettings) {
this.nodeSettings = nodeSettings;
this.ingestEnabled = nodeSettings.getAsBoolean(NODE_INGEST_SETTING, false);
this.transportClient = TransportClient.CLIENT_TYPE.equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING));
this.ingestEnabled = nodeSettings.getAsBoolean("node.ingest", false);
}
@Override
@ -90,86 +55,26 @@ public class IngestPlugin extends Plugin {
@Override
public String description() {
return "Plugin that allows to configure pipelines to preprocess documents before indexing";
return "Plugin that allows to plug in ingest processors";
}
@Override
public Collection<Module> nodeModules() {
if (transportClient) {
return Collections.emptyList();
} else {
return Collections.singletonList(new IngestModule(ingestEnabled));
}
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (transportClient|| ingestEnabled == false) {
return Collections.emptyList();
} else {
return Collections.singletonList(IngestBootstrapper.class);
}
}
@Override
public Settings additionalSettings() {
return settingsBuilder()
.put(PipelineExecutionService.additionalSettings(nodeSettings))
.build();
}
public void onModule(ProcessorsModule processorsModule) {
public void onModule(IngestModule ingestModule) {
if (ingestEnabled) {
processorsModule.addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
processorsModule.addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
processorsModule.addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
processorsModule.addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
processorsModule.addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
processorsModule.addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
processorsModule.addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
processorsModule.addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
processorsModule.addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
processorsModule.addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
processorsModule.addProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
processorsModule.addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
processorsModule.addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
processorsModule.addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
processorsModule.addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
ingestModule.addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
ingestModule.addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
ingestModule.addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
ingestModule.addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
ingestModule.addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
ingestModule.addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
ingestModule.addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
ingestModule.addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
ingestModule.addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
ingestModule.addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
ingestModule.addProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
ingestModule.addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
ingestModule.addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
ingestModule.addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
ingestModule.addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
}
}
public void onModule(ActionModule module) {
if (transportClient == false) {
if (ingestEnabled) {
module.registerFilter(IngestActionFilter.class);
} else {
module.registerFilter(IngestDisabledActionFilter.class);
}
}
if (ingestEnabled) {
module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
module.registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
}
}
public void onModule(NetworkModule networkModule) {
if (transportClient) {
return;
}
if (ingestEnabled) {
networkModule.registerRestHandler(RestPutPipelineAction.class);
networkModule.registerRestHandler(RestGetPipelineAction.class);
networkModule.registerRestHandler(RestDeletePipelineAction.class);
networkModule.registerRestHandler(RestSimulatePipelineAction.class);
} else {
networkModule.registerRestHandler(RestIngestDisabledAction.class);
}
}
public void onModule(ScriptModule module) {
module.registerScriptContext(InternalTemplateService.INGEST_SCRIPT_CONTEXT);
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.BytesRestResponse;
public class RestIngestDisabledAction extends BaseRestHandler {
@Inject
public RestIngestDisabledAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.DELETE, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.PUT, "/_ingest/pipeline/{id}", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
channel.sendResponse(new BytesRestResponse(channel, new IllegalArgumentException("ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used")));
}
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

View File

@ -19,12 +19,12 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;

View File

@ -19,7 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTime;

View File

@ -19,8 +19,8 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;

View File

@ -20,9 +20,8 @@
package org.elasticsearch.ingest.processor;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.GeoIpProcessor;
import org.elasticsearch.test.ESTestCase;
import java.io.InputStream;

View File

@ -19,10 +19,8 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Grok;
import org.elasticsearch.ingest.processor.GrokProcessor;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;

View File

@ -19,10 +19,10 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;

Some files were not shown because too many files have changed in this diff Show More