Ingest: Moved ingest invocation into index/bulk actions (#22015)

* Ingest: Moved ingest invocation into index/bulk actions

Ingest was originally setup as a plugin, and in order to hook into the
index and bulk actions, action filters were used. However, ingest was
later moved into core, but the action filters were never removed. This
change moves the execution of ingest into the index and bulk actions.

* Address PR comments

* Remove forwarder direct dependency on ClusterService
This commit is contained in:
Ryan Ernst 2016-12-07 08:43:26 -08:00 committed by GitHub
parent 8006b105f3
commit f02a2b6546
16 changed files with 772 additions and 921 deletions

View File

@ -170,8 +170,6 @@ import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineTransportAction; import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
import org.elasticsearch.action.ingest.GetPipelineAction; import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineTransportAction; import org.elasticsearch.action.ingest.GetPipelineTransportAction;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.action.ingest.IngestProxyActionFilter;
import org.elasticsearch.action.ingest.PutPipelineAction; import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.action.ingest.SimulatePipelineAction; import org.elasticsearch.action.ingest.SimulatePipelineAction;
@ -334,13 +332,13 @@ public class ActionModule extends AbstractModule {
private final DestructiveOperations destructiveOperations; private final DestructiveOperations destructiveOperations;
private final RestController restController; private final RestController restController;
public ActionModule(boolean ingestEnabled, boolean transportClient, Settings settings, IndexNameExpressionResolver resolver, public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver resolver,
ClusterSettings clusterSettings, ThreadPool threadPool, List<ActionPlugin> actionPlugins) { ClusterSettings clusterSettings, ThreadPool threadPool, List<ActionPlugin> actionPlugins) {
this.transportClient = transportClient; this.transportClient = transportClient;
this.settings = settings; this.settings = settings;
this.actionPlugins = actionPlugins; this.actionPlugins = actionPlugins;
actions = setupActions(actionPlugins); actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins, ingestEnabled); actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, resolver); autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, resolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings); destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet()); Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
@ -477,20 +475,8 @@ public class ActionModule extends AbstractModule {
return unmodifiableMap(actions.getRegistry()); return unmodifiableMap(actions.getRegistry());
} }
private List<Class<? extends ActionFilter>> setupActionFilters(List<ActionPlugin> actionPlugins, boolean ingestEnabled) { private List<Class<? extends ActionFilter>> setupActionFilters(List<ActionPlugin> actionPlugins) {
List<Class<? extends ActionFilter>> filters = new ArrayList<>(); return unmodifiableList(actionPlugins.stream().flatMap(p -> p.getActionFilters().stream()).collect(Collectors.toList()));
if (transportClient == false) {
if (ingestEnabled) {
filters.add(IngestActionFilter.class);
} else {
filters.add(IngestProxyActionFilter.class);
}
}
for (ActionPlugin plugin : actionPlugins) {
filters.addAll(plugin.getActionFilters());
}
return unmodifiableList(filters);
} }
static Set<Class<? extends RestHandler>> setupRestHandlers(List<ActionPlugin> actionPlugins) { static Set<Class<? extends RestHandler>> setupRestHandlers(List<ActionPlugin> actionPlugins) {

View File

@ -19,8 +19,25 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.RoutingMissingException;
@ -30,6 +47,7 @@ import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
@ -48,23 +66,12 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
/** /**
* Groups bulk request items by shard, optionally creating non-existent indices and * Groups bulk request items by shard, optionally creating non-existent indices and
* delegates to {@link TransportShardBulkAction} for shard-level bulk execution * delegates to {@link TransportShardBulkAction} for shard-level bulk execution
@ -74,35 +81,41 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final AutoCreateIndex autoCreateIndex; private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration; private final boolean allowIdGeneration;
private final ClusterService clusterService; private final ClusterService clusterService;
private final IngestService ingestService;
private final TransportShardBulkAction shardBulkAction; private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction; private final TransportCreateIndexAction createIndexAction;
private final LongSupplier relativeTimeProvider; private final LongSupplier relativeTimeProvider;
private final IngestActionForwarder ingestForwarder;
@Inject @Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction, TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) { AutoCreateIndex autoCreateIndex) {
this(settings, threadPool, transportService, clusterService, this(settings, threadPool, transportService, clusterService, ingestService,
shardBulkAction, createIndexAction, shardBulkAction, createIndexAction,
actionFilters, indexNameExpressionResolver, actionFilters, indexNameExpressionResolver,
autoCreateIndex, autoCreateIndex,
System::nanoTime); System::nanoTime);
} }
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction, TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) { AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, BulkRequest::new); super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, BulkRequest::new);
Objects.requireNonNull(relativeTimeProvider); Objects.requireNonNull(relativeTimeProvider);
this.clusterService = clusterService; this.clusterService = clusterService;
this.ingestService = ingestService;
this.shardBulkAction = shardBulkAction; this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction; this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex; this.autoCreateIndex = autoCreateIndex;
this.allowIdGeneration = this.settings.getAsBoolean("action.bulk.action.allow_id_generation", true); this.allowIdGeneration = this.settings.getAsBoolean("action.bulk.action.allow_id_generation", true);
this.relativeTimeProvider = relativeTimeProvider; this.relativeTimeProvider = relativeTimeProvider;
this.ingestForwarder = new IngestActionForwarder(transportService);
clusterService.add(this.ingestForwarder);
} }
@Override @Override
@ -112,6 +125,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override @Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) { protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
if (bulkRequest.hasIndexRequestsWithPipelines()) {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
return;
}
final long startTime = relativeTime(); final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size()); final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
@ -376,4 +398,131 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return relativeTimeProvider.getAsLong(); return relativeTimeProvider.getAsLong();
} }
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
bulkRequestModifier.markCurrentItemAsFailed(exception);
}, (exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
doExecute(task, bulkRequest, actionListener);
}
}
});
}
static final class BulkRequestModifier implements Iterator<DocWriteRequest> {
final BulkRequest bulkRequest;
final SparseFixedBitSet failedSlots;
final List<BulkItemResponse> itemResponses;
int currentSlot = -1;
int[] originalSlots;
BulkRequestModifier(BulkRequest bulkRequest) {
this.bulkRequest = bulkRequest;
this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size());
this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
}
@Override
public DocWriteRequest next() {
return bulkRequest.requests().get(++currentSlot);
}
@Override
public boolean hasNext() {
return (currentSlot + 1) < bulkRequest.requests().size();
}
BulkRequest getBulkRequest() {
if (itemResponses.isEmpty()) {
return bulkRequest;
} else {
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
modifiedBulkRequest.timeout(bulkRequest.timeout());
int slot = 0;
List<DocWriteRequest> requests = bulkRequest.requests();
originalSlots = new int[requests.size()]; // oversize, but that's ok
for (int i = 0; i < requests.size(); i++) {
DocWriteRequest request = requests.get(i);
if (failedSlots.get(i) == false) {
modifiedBulkRequest.add(request);
originalSlots[slot++] = i;
}
}
return modifiedBulkRequest;
}
}
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return ActionListener.wrap(
response -> actionListener.onResponse(
new BulkResponse(response.getItems(), response.getTookInMillis(), ingestTookInMillis)),
actionListener::onFailure);
} else {
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
}
}
void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
// 2) Add a bulk item failure for this request
// 3) Continue with the next request in the bulk.
failedSlots.set(currentSlot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
}
}
static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {
private final long ingestTookInMillis;
private final int[] originalSlots;
private final List<BulkItemResponse> itemResponses;
private final ActionListener<BulkResponse> actionListener;
IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses, ActionListener<BulkResponse> actionListener) {
this.ingestTookInMillis = ingestTookInMillis;
this.itemResponses = itemResponses;
this.actionListener = actionListener;
this.originalSlots = originalSlots;
}
@Override
public void onResponse(BulkResponse response) {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), response.getTookInMillis(), ingestTookInMillis));
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}
} }

View File

@ -19,11 +19,14 @@
package org.elasticsearch.action.index; package org.elasticsearch.action.index;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationOperation;
@ -36,6 +39,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
@ -46,6 +50,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -67,14 +72,16 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
private final TransportCreateIndexAction createIndexAction; private final TransportCreateIndexAction createIndexAction;
private final ClusterService clusterService; private final ClusterService clusterService;
private final IngestService ingestService;
private final MappingUpdatedAction mappingUpdatedAction; private final MappingUpdatedAction mappingUpdatedAction;
private final IngestActionForwarder ingestForwarder;
@Inject @Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, IndicesService indicesService, IngestService ingestService, ThreadPool threadPool,
TransportCreateIndexAction createIndexAction, MappingUpdatedAction mappingUpdatedAction, ShardStateAction shardStateAction, TransportCreateIndexAction createIndexAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
AutoCreateIndex autoCreateIndex) { IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX); actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
this.mappingUpdatedAction = mappingUpdatedAction; this.mappingUpdatedAction = mappingUpdatedAction;
@ -82,13 +89,24 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
this.autoCreateIndex = autoCreateIndex; this.autoCreateIndex = autoCreateIndex;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.clusterService = clusterService; this.clusterService = clusterService;
this.ingestService = ingestService;
this.ingestForwarder = new IngestActionForwarder(transportService);
clusterService.add(this.ingestForwarder);
} }
@Override @Override
protected void doExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) { protected void doExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
if (Strings.hasText(request.getPipeline())) {
if (clusterService.localNode().isIngestNode()) {
processIngestIndexRequest(task, request, listener);
} else {
ingestForwarder.forwardIngestRequest(IndexAction.INSTANCE, request, listener);
}
return;
}
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) { if (shouldAutoCreate(request, state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(); CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(request.index()); createIndexRequest.index(request.index());
createIndexRequest.cause("auto(index api)"); createIndexRequest.cause("auto(index api)");
@ -119,6 +137,10 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
} }
} }
protected boolean shouldAutoCreate(IndexRequest request, ClusterState state) {
return autoCreateIndex.shouldAutoCreate(request.index(), state);
}
@Override @Override
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) { protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
super.resolveRequest(metaData, indexMetaData, request); super.resolveRequest(metaData, indexMetaData, request);
@ -130,7 +152,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
request.setShardId(shardId); request.setShardId(shardId);
} }
private void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) { protected void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
super.doExecute(task, request, listener); super.doExecute(task, request, listener);
} }
@ -227,5 +249,18 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
return primary.index(operation); return primary.index(operation);
} }
private void processIngestIndexRequest(Task task, IndexRequest indexRequest, ActionListener listener) {
ingestService.getPipelineExecutionService().executeIndexRequest(indexRequest, t -> {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to execute pipeline [{}]", indexRequest.getPipeline()), t);
listener.onFailure(t);
}, success -> {
// TransportIndexAction uses IndexRequest and same action name on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence we set the pipeline to null once its execution completed.
indexRequest.setPipeline(null);
doExecute(task, indexRequest, listener);
});
}
} }

View File

@ -1,238 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.tasks.Task;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public final class IngestActionFilter extends AbstractComponent implements ActionFilter {
private final PipelineExecutionService executionService;
@Inject
public IngestActionFilter(Settings settings, NodeService nodeService) {
super(settings);
this.executionService = nodeService.getIngestService().getPipelineExecutionService();
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
switch (action) {
case IndexAction.NAME:
IndexRequest indexRequest = (IndexRequest) request;
if (Strings.hasText(indexRequest.getPipeline())) {
processIndexRequest(task, action, listener, chain, (IndexRequest) request);
} else {
chain.proceed(task, action, request, listener);
}
break;
case BulkAction.NAME:
BulkRequest bulkRequest = (BulkRequest) request;
if (bulkRequest.hasIndexRequestsWithPipelines()) {
@SuppressWarnings("unchecked")
ActionListener<BulkResponse> actionListener = (ActionListener<BulkResponse>) listener;
processBulkIndexRequest(task, bulkRequest, action, chain, actionListener);
} else {
chain.proceed(task, action, request, listener);
}
break;
default:
chain.proceed(task, action, request, listener);
break;
}
}
void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) {
executionService.executeIndexRequest(indexRequest, t -> {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to execute pipeline [{}]", indexRequest.getPipeline()), t);
listener.onFailure(t);
}, success -> {
// TransportIndexAction uses IndexRequest and same action name on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence we set the pipeline to null once its execution completed.
indexRequest.setPipeline(null);
chain.proceed(task, action, indexRequest, listener);
});
}
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
bulkRequestModifier.markCurrentItemAsFailed(exception);
}, (exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
chain.proceed(task, action, bulkRequest, actionListener);
}
}
});
}
@Override
public int order() {
return Integer.MAX_VALUE;
}
static final class BulkRequestModifier implements Iterator<DocWriteRequest> {
final BulkRequest bulkRequest;
final Set<Integer> failedSlots;
final List<BulkItemResponse> itemResponses;
int currentSlot = -1;
int[] originalSlots;
BulkRequestModifier(BulkRequest bulkRequest) {
this.bulkRequest = bulkRequest;
this.failedSlots = new HashSet<>();
this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
}
@Override
public DocWriteRequest next() {
return bulkRequest.requests().get(++currentSlot);
}
@Override
public boolean hasNext() {
return (currentSlot + 1) < bulkRequest.requests().size();
}
BulkRequest getBulkRequest() {
if (itemResponses.isEmpty()) {
return bulkRequest;
} else {
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
modifiedBulkRequest.timeout(bulkRequest.timeout());
int slot = 0;
originalSlots = new int[bulkRequest.requests().size() - failedSlots.size()];
for (int i = 0; i < bulkRequest.requests().size(); i++) {
DocWriteRequest request = bulkRequest.requests().get(i);
if (failedSlots.contains(i) == false) {
modifiedBulkRequest.add(request);
originalSlots[slot++] = i;
}
}
return modifiedBulkRequest;
}
}
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
actionListener.onResponse(new BulkResponse(response.getItems(), response.getTookInMillis(), ingestTookInMillis));
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
};
} else {
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
}
}
void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
// 2) Add a bulk item failure for this request
// 3) Continue with the next request in the bulk.
failedSlots.add(currentSlot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
}
}
static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {
private final long ingestTookInMillis;
private final int[] originalSlots;
private final List<BulkItemResponse> itemResponses;
private final ActionListener<BulkResponse> actionListener;
IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses, ActionListener<BulkResponse> actionListener) {
this.ingestTookInMillis = ingestTookInMillis;
this.itemResponses = itemResponses;
this.actionListener = actionListener;
this.originalSlots = originalSlots;
}
@Override
public void onResponse(BulkResponse response) {
for (int i = 0; i < response.getItems().length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), response.getTookInMillis(), ingestTookInMillis));
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.transport.TransportService;
/**
* A utility for forwarding ingest requests to ingest nodes in a round-robin fashion.
*
* TODO: move this into IngestService and make index/bulk actions call that
*/
public final class IngestActionForwarder implements ClusterStateListener {
private final TransportService transportService;
private final AtomicInteger ingestNodeGenerator = new AtomicInteger(Randomness.get().nextInt());
private DiscoveryNode[] ingestNodes;
public IngestActionForwarder(TransportService transportService) {
this.transportService = transportService;
ingestNodes = new DiscoveryNode[0];
}
public void forwardIngestRequest(Action<?, ?, ?> action, ActionRequest request, ActionListener<?> listener) {
transportService.sendRequest(randomIngestNode(), action.name(), request,
new ActionListenerResponseHandler(listener, action::newResponse));
}
private DiscoveryNode randomIngestNode() {
final DiscoveryNode[] nodes = ingestNodes;
if (nodes.length == 0) {
throw new IllegalStateException("There are no ingest nodes in this cluster, unable to forward request to an ingest node.");
}
return nodes[Math.floorMod(ingestNodeGenerator.incrementAndGet(), nodes.length)];
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
ingestNodes = event.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode.class);
}
}

View File

@ -1,114 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicInteger;
public final class IngestProxyActionFilter implements ActionFilter {
private final ClusterService clusterService;
private final TransportService transportService;
private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());
@Inject
public IngestProxyActionFilter(ClusterService clusterService, TransportService transportService) {
this.clusterService = clusterService;
this.transportService = transportService;
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
Action ingestAction;
switch (action) {
case IndexAction.NAME:
ingestAction = IndexAction.INSTANCE;
IndexRequest indexRequest = (IndexRequest) request;
if (Strings.hasText(indexRequest.getPipeline())) {
forwardIngestRequest(ingestAction, request, listener);
} else {
chain.proceed(task, action, request, listener);
}
break;
case BulkAction.NAME:
ingestAction = BulkAction.INSTANCE;
BulkRequest bulkRequest = (BulkRequest) request;
if (bulkRequest.hasIndexRequestsWithPipelines()) {
forwardIngestRequest(ingestAction, request, listener);
} else {
chain.proceed(task, action, request, listener);
}
break;
default:
chain.proceed(task, action, request, listener);
break;
}
}
@SuppressWarnings("unchecked")
private void forwardIngestRequest(Action<?, ?, ?> action, ActionRequest request, ActionListener<?> listener) {
transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener, action::newResponse));
}
@Override
public int order() {
return Integer.MAX_VALUE;
}
private DiscoveryNode randomIngestNode() {
assert clusterService.localNode().isIngestNode() == false;
DiscoveryNodes nodes = clusterService.state().getNodes();
DiscoveryNode[] ingestNodes = nodes.getIngestNodes().values().toArray(DiscoveryNode.class);
if (ingestNodes.length == 0) {
throw new IllegalStateException("There are no ingest nodes in this cluster, unable to forward request to an ingest node.");
}
int index = getNodeNumber();
return ingestNodes[(index) % ingestNodes.length];
}
private int getNodeNumber() {
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
return index;
}
}

View File

@ -147,7 +147,7 @@ public abstract class TransportClient extends AbstractClient {
modules.add(pluginModule); modules.add(pluginModule);
} }
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool)); modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(), ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getClusterSettings(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class)); threadPool, pluginsService.filterPlugins(ActionPlugin.class));
modules.add(actionModule); modules.add(actionModule);

View File

@ -343,9 +343,8 @@ public class Node implements Closeable {
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule); modules.add(indicesModule);
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
ActionModule actionModule = new ActionModule(DiscoveryNode.isIngestNode(settings), false, settings, ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
clusterModule.getIndexNameExpressionResolver(), settingsModule.getClusterSettings(), settingsModule.getClusterSettings(), threadPool, pluginsService.filterPlugins(ActionPlugin.class));
threadPool, pluginsService.filterPlugins(ActionPlugin.class));
modules.add(actionModule); modules.add(actionModule);
modules.add(new GatewayModule()); modules.add(new GatewayModule());
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class))); modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class)));

View File

@ -441,7 +441,7 @@ public class TransportService extends AbstractLifecycleComponent {
return futureHandler; return futureHandler;
} }
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request, final TransportRequest request,
final TransportResponseHandler<T> handler) { final TransportResponseHandler<T> handler) {
sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler); sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
@ -626,7 +626,7 @@ public class TransportService extends AbstractLifecycleComponent {
* @param executor The executor the request handling will be executed on * @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling * @param handler The handler itself that implements the request handling
*/ */
public final <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory,
String executor, TransportRequestHandler<Request> handler) { String executor, TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, handler); handler = interceptor.interceptHandler(action, executor, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>( RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
@ -644,7 +644,7 @@ public class TransportService extends AbstractLifecycleComponent {
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
* @param handler The handler itself that implements the request handling * @param handler The handler itself that implements the request handling
*/ */
public final <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request,
String executor, boolean forceExecution, String executor, boolean forceExecution,
boolean canTripCircuitBreaker, boolean canTripCircuitBreaker,
TransportRequestHandler<Request> handler) { TransportRequestHandler<Request> handler) {

View File

@ -1,5 +1,3 @@
package org.elasticsearch.action.ingest;
/* /*
* Licensed to Elasticsearch under one or more contributor * Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with * license agreements. See the NOTICE file distributed with
@ -19,16 +17,7 @@ package org.elasticsearch.action.ingest;
* under the License. * under the License.
*/ */
import org.elasticsearch.action.ActionListener; package org.elasticsearch.action.bulk;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -36,6 +25,14 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -50,7 +47,7 @@ public class BulkRequestModifierTests extends ESTestCase {
bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}")); bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}"));
} }
CaptureActionListener actionListener = new CaptureActionListener(); CaptureActionListener actionListener = new CaptureActionListener();
IngestActionFilter.BulkRequestModifier bulkRequestModifier = new IngestActionFilter.BulkRequestModifier(bulkRequest); TransportBulkAction.BulkRequestModifier bulkRequestModifier = new TransportBulkAction.BulkRequestModifier(bulkRequest);
int i = 0; int i = 0;
Set<Integer> failedSlots = new HashSet<>(); Set<Integer> failedSlots = new HashSet<>();
@ -91,7 +88,7 @@ public class BulkRequestModifierTests extends ESTestCase {
originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i))); originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
} }
IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest); TransportBulkAction.BulkRequestModifier modifier = new TransportBulkAction.BulkRequestModifier(originalBulkRequest);
for (int i = 0; modifier.hasNext(); i++) { for (int i = 0; modifier.hasNext(); i++) {
modifier.next(); modifier.next();
if (i % 2 == 0) { if (i % 2 == 0) {
@ -135,7 +132,7 @@ public class BulkRequestModifierTests extends ESTestCase {
originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i))); originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
} }
IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest); TransportBulkAction.BulkRequestModifier modifier = new TransportBulkAction.BulkRequestModifier(originalBulkRequest);
while (modifier.hasNext()) { while (modifier.hasNext()) {
modifier.next(); modifier.next();
} }

View File

@ -0,0 +1,250 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TransportBulkActionIngestTests extends ESTestCase {
/** Services needed by bulk action */
TransportService transportService;
ClusterService clusterService;
IngestService ingestService;
/** The ingest execution service we can capture calls to */
PipelineExecutionService executionService;
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
@Captor
ArgumentCaptor<BiConsumer<IndexRequest, Exception>> failureHandler;
@Captor
ArgumentCaptor<Consumer<Exception>> completionHandler;
@Captor
ArgumentCaptor<TransportResponseHandler<BulkResponse>> remoteResponseHandler;
@Captor
ArgumentCaptor<Iterable<DocWriteRequest>> bulkDocsItr;
/** The actual action we want to test, with real indexing mocked */
TestTransportBulkAction action;
/** True if the next call to the index action should act as an ingest node */
boolean localIngest;
/** The nodes that forwarded index requests should be cycled through. */
DiscoveryNodes nodes;
DiscoveryNode remoteNode1;
DiscoveryNode remoteNode2;
/** A subclass of the real bulk action to allow skipping real bulk indexing, and marking when it would have happened. */
class TestTransportBulkAction extends TransportBulkAction {
boolean isExecuted = false; // set when the "real" bulk execution happens
TestTransportBulkAction() {
super(Settings.EMPTY, null, transportService, clusterService, ingestService,
null, null, new ActionFilters(Collections.emptySet()), null, null);
}
@Override
protected boolean needToCheck() {
return false;
}
@Override
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses) {
isExecuted = true;
}
}
@Before
public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
MockitoAnnotations.initMocks(this);
// setup services that will be called by action
transportService = mock(TransportService.class);
clusterService = mock(ClusterService.class);
localIngest = true;
// setup nodes for local and remote
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.isIngestNode()).thenAnswer(stub -> localIngest);
when(clusterService.localNode()).thenReturn(localNode);
remoteNode1 = mock(DiscoveryNode.class);
remoteNode2 = mock(DiscoveryNode.class);
nodes = mock(DiscoveryNodes.class);
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
when(event.state()).thenReturn(state);
((ClusterStateListener)invocation.getArguments()[0]).clusterChanged(event);
return null;
}).when(clusterService).add(any(ClusterStateListener.class));
// setup the mocked ingest service for capturing calls
ingestService = mock(IngestService.class);
executionService = mock(PipelineExecutionService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
action = new TestTransportBulkAction();
reset(transportService); // call on construction of action
}
public void testIngestSkipped() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
bulkRequest.add(indexRequest);
action.execute(null, bulkRequest, ActionListener.wrap(response -> {}, exception -> {
throw new AssertionError(exception);
}));
assertTrue(action.isExecuted);
verifyZeroInteractions(ingestService);
}
public void testIngestLocal() throws Exception {
Exception exception = new Exception("fake exception");
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("index", "type", "id");
indexRequest1.source(Collections.emptyMap());
indexRequest1.setPipeline("testpipeline");
IndexRequest indexRequest2 = new IndexRequest("index", "type", "id");
indexRequest2.source(Collections.emptyMap());
indexRequest2.setPipeline("testpipeline");
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
action.execute(null, bulkRequest, ActionListener.wrap(
response -> {
BulkItemResponse itemResponse = response.iterator().next();
assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception"));
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));
// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
// now check success
Iterator<DocWriteRequest> req = bulkDocsItr.getValue().iterator();
failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request
indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}
public void testIngestForward() throws Exception {
localIngest = false;
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
bulkRequest.add(indexRequest);
BulkResponse bulkResponse = mock(BulkResponse.class);
AtomicBoolean responseCalled = new AtomicBoolean(false);
ActionListener<BulkResponse> listener = ActionListener.wrap(
response -> {
responseCalled.set(true);
assertSame(bulkResponse, response);
},
e -> {
throw new AssertionError(e);
});
action.execute(null, bulkRequest, listener);
// should not have executed ingest locally
verify(executionService, never()).executeBulkRequest(any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
boolean usedNode1 = node.getValue() == remoteNode1; // make sure we used one of the nodes
if (usedNode1 == false) {
assertSame(remoteNode2, node.getValue());
}
assertFalse(action.isExecuted); // no local index execution
assertFalse(responseCalled.get()); // listener not called yet
remoteResponseHandler.getValue().handleResponse(bulkResponse); // call the listener for the remote node
assertTrue(responseCalled.get()); // now the listener we passed should have been delegated to by the remote listener
assertFalse(action.isExecuted); // still no local index execution
// now make sure ingest nodes are rotated through with a subsequent request
reset(transportService);
action.execute(null, bulkRequest, listener);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
if (usedNode1) {
assertSame(remoteNode2, node.getValue());
} else {
assertSame(remoteNode1, node.getValue());
}
}
}

View File

@ -237,6 +237,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
threadPool, threadPool,
transportService, transportService,
clusterService, clusterService,
null,
shardBulkAction, shardBulkAction,
createIndexAction, createIndexAction,
actionFilters, actionFilters,

View File

@ -0,0 +1,224 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.index;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TransportIndexActionIngestTests extends ESTestCase {
/** Services needed by index action */
TransportService transportService;
ClusterService clusterService;
IngestService ingestService;
/** The ingest execution service we can capture calls to */
PipelineExecutionService executionService;
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
@Captor
ArgumentCaptor<Consumer<Exception>> exceptionHandler;
@Captor
ArgumentCaptor<Consumer<Boolean>> successHandler;
@Captor
ArgumentCaptor<TransportResponseHandler<IndexResponse>> remoteResponseHandler;
/** The actual action we want to test, with real indexing mocked */
TestTransportIndexAction action;
/** True if the next call to the index action should act as an ingest node */
boolean localIngest;
/** The nodes that forwarded index requests should be cycled through. */
DiscoveryNodes nodes;
DiscoveryNode remoteNode1;
DiscoveryNode remoteNode2;
/** A subclass of the real index action to allow skipping real indexing, and marking when it would have happened. */
class TestTransportIndexAction extends TransportIndexAction {
boolean isExecuted = false; // set when the "real" index execution happens
TestTransportIndexAction() {
super(Settings.EMPTY, transportService, clusterService, null, ingestService, null, null, null, null,
new ActionFilters(Collections.emptySet()), null, null);
}
@Override
protected boolean shouldAutoCreate(IndexRequest reqest, ClusterState state) {
return false;
}
@Override
protected void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
isExecuted = true;
}
}
@Before
public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
MockitoAnnotations.initMocks(this);
// setup services that will be called by action
transportService = mock(TransportService.class);
clusterService = mock(ClusterService.class);
localIngest = true;
// setup nodes for local and remote
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.isIngestNode()).thenAnswer(stub -> localIngest);
when(clusterService.localNode()).thenReturn(localNode);
remoteNode1 = mock(DiscoveryNode.class);
remoteNode2 = mock(DiscoveryNode.class);
nodes = mock(DiscoveryNodes.class);
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
when(event.state()).thenReturn(state);
((ClusterStateListener)invocation.getArguments()[0]).clusterChanged(event);
return null;
}).when(clusterService).add(any(ClusterStateListener.class));
// setup the mocked ingest service for capturing calls
ingestService = mock(IngestService.class);
executionService = mock(PipelineExecutionService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
action = new TestTransportIndexAction();
reset(transportService); // call on construction of action
}
public void testIngestSkipped() throws Exception {
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
action.execute(null, indexRequest, ActionListener.wrap(response -> {}, exception -> {
throw new AssertionError(exception);
}));
assertTrue(action.isExecuted);
verifyZeroInteractions(ingestService);
}
public void testIngestLocal() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
action.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));
// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeIndexRequest(same(indexRequest), exceptionHandler.capture(), successHandler.capture());
exceptionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
// now check success
successHandler.getValue().accept(true);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}
public void testIngestForward() throws Exception {
localIngest = false;
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
IndexResponse indexResponse = mock(IndexResponse.class);
AtomicBoolean responseCalled = new AtomicBoolean(false);
ActionListener<IndexResponse> listener = ActionListener.wrap(
response -> {
responseCalled.set(true);
assertSame(indexResponse, response);
},
e -> {
throw new AssertionError(e);
});
action.execute(null, indexRequest, listener);
// should not have executed ingest locally
verify(executionService, never()).executeIndexRequest(any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(IndexAction.NAME), any(), remoteResponseHandler.capture());
boolean usedNode1 = node.getValue() == remoteNode1; // make sure we used one of the nodes
if (usedNode1 == false) {
assertSame(remoteNode2, node.getValue());
}
assertFalse(action.isExecuted); // no local index execution
assertFalse(responseCalled.get()); // listener not called yet
remoteResponseHandler.getValue().handleResponse(indexResponse); // call the listener for the remote node
assertTrue(responseCalled.get()); // now the listener we passed should have been delegated to by the remote listener
assertFalse(action.isExecuted); // still no local index execution
// now make sure ingest nodes are rotated through with a subsequent request
reset(transportService);
action.execute(null, indexRequest, listener);
verify(transportService).sendRequest(node.capture(), eq(IndexAction.NAME), any(), remoteResponseHandler.capture());
if (usedNode1) {
assertSame(remoteNode2, node.getValue());
} else {
assertSame(remoteNode1, node.getValue());
}
}
}

View File

@ -1,235 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.mockito.stubbing.Answer;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class IngestActionFilterTests extends ESTestCase {
private IngestActionFilter filter;
private PipelineExecutionService executionService;
@Before
public void setup() {
executionService = mock(PipelineExecutionService.class);
IngestService ingestService = mock(IngestService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
NodeService nodeService = mock(NodeService.class);
when(nodeService.getIngestService()).thenReturn(ingestService);
filter = new IngestActionFilter(Settings.EMPTY, nodeService);
}
public void testApplyNoPipelineId() throws Exception {
IndexRequest indexRequest = new IndexRequest();
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener);
verifyZeroInteractions(executionService, actionFilterChain);
}
public void testApplyBulkNoPipelineId() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest());
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
verify(actionFilterChain).proceed(task, BulkAction.NAME, bulkRequest, actionListener);
verifyZeroInteractions(executionService, actionFilterChain);
}
@SuppressWarnings("unchecked")
public void testApplyIngestIdViaRequestParam() throws Exception {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field", "value");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
verifyZeroInteractions(actionFilterChain);
}
@SuppressWarnings("unchecked")
public void testApplyExecuted() throws Exception {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field", "value");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
Answer answer = invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Boolean> listener = (Consumer) invocationOnMock.getArguments()[2];
listener.accept(true);
return null;
};
doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener);
verifyZeroInteractions(actionListener);
}
@SuppressWarnings("unchecked")
public void testApplyFailed() throws Exception {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field", "value");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
RuntimeException exception = new RuntimeException();
Answer answer = invocationOnMock -> {
Consumer<Throwable> handler = (Consumer) invocationOnMock.getArguments()[1];
handler.accept(exception);
return null;
};
doAnswer(answer).when(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
verify(actionListener).onFailure(exception);
verifyZeroInteractions(actionFilterChain);
}
public void testApplyWithBulkRequest() throws Exception {
Task task = mock(Task.class);
ThreadPool threadPool = mock(ThreadPool.class);
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(any())).thenReturn(executorService);
PipelineStore store = mock(PipelineStore.class);
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", randomInt(), new CompoundProcessor(processor)));
executionService = new PipelineExecutionService(store, threadPool);
IngestService ingestService = mock(IngestService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
NodeService nodeService = mock(NodeService.class);
when(nodeService.getIngestService()).thenReturn(ingestService);
filter = new IngestActionFilter(Settings.EMPTY, nodeService);
BulkRequest bulkRequest = new BulkRequest();
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
if (rarely()) {
DocWriteRequest request;
if (randomBoolean()) {
request = new DeleteRequest("_index", "_type", "_id");
} else {
request = new UpdateRequest("_index", "_type", "_id");
}
bulkRequest.add(request);
} else {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field1", "value1");
bulkRequest.add(indexRequest);
}
}
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
verify(actionFilterChain).proceed(eq(task), eq(BulkAction.NAME), eq(bulkRequest), any());
verifyZeroInteractions(actionListener);
int assertedRequests = 0;
for (DocWriteRequest actionRequest : bulkRequest.requests()) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
assertThat(indexRequest.sourceAsMap().size(), equalTo(2));
assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1"));
assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2"));
}
assertedRequests++;
}
assertThat(assertedRequests, equalTo(numRequest));
}
@SuppressWarnings("unchecked")
public void testIndexApiSinglePipelineExecution() {
Answer answer = invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Boolean> listener = (Consumer) invocationOnMock.getArguments()[2];
listener.accept(true);
return null;
};
doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id").source("field", "value");
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
assertThat(indexRequest.getPipeline(), nullValue());
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(executionService, times(1)).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
verify(actionFilterChain, times(2)).proceed(task, IndexAction.NAME, indexRequest, actionListener);
}
}

View File

@ -1,271 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.CustomTypeSafeMatcher;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class IngestProxyActionFilterTests extends ESTestCase {
private TransportService transportService;
@SuppressWarnings("unchecked")
private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes, TransportInterceptor interceptor) {
ClusterState.Builder clusterState = new ClusterState.Builder(new ClusterName("_name"));
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder();
DiscoveryNode localNode = null;
for (int i = 0; i < totalNodes; i++) {
String nodeId = "node" + i;
Map<String, String> attributes = new HashMap<>();
Set<DiscoveryNode.Role> roles = new HashSet<>();
if (i < ingestNodes) {
roles.add(DiscoveryNode.Role.INGEST);
}
DiscoveryNode node = new DiscoveryNode(nodeId, nodeId, buildNewFakeTransportAddress(), attributes, roles, VersionUtils.randomVersion(random()));
builder.add(node);
if (i == totalNodes - 1) {
localNode = node;
}
}
clusterState.nodes(builder);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.localNode()).thenReturn(localNode);
when(clusterService.state()).thenReturn(clusterState.build());
transportService = new TransportService(Settings.EMPTY, null, null, interceptor, null);
return new IngestProxyActionFilter(clusterService, transportService);
}
public void testApplyNoIngestNodes() {
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(1, 5);
IngestProxyActionFilter filter = buildFilter(0, totalNodes, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action;
ActionRequest request;
if (randomBoolean()) {
action = IndexAction.NAME;
request = new IndexRequest().setPipeline("_id");
} else {
action = BulkAction.NAME;
request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
}
try {
filter.apply(task, action, request, actionListener, actionFilterChain);
fail("should have failed because there are no ingest nodes");
} catch(IllegalStateException e) {
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster, unable to forward request to an ingest node."));
}
verifyZeroInteractions(actionFilterChain);
verifyZeroInteractions(actionListener);
}
public void testApplyNoPipelineId() {
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(1, 5);
IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action;
ActionRequest request;
if (randomBoolean()) {
action = IndexAction.NAME;
request = new IndexRequest();
} else {
action = BulkAction.NAME;
request = new BulkRequest().add(new IndexRequest());
}
filter.apply(task, action, request, actionListener, actionFilterChain);
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
verifyZeroInteractions(actionListener);
}
public void testApplyAnyAction() {
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
ActionRequest request = mock(ActionRequest.class);
int totalNodes = randomIntBetween(1, 5);
IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
String action = randomAsciiOfLengthBetween(1, 20);
filter.apply(task, action, request, actionListener, actionFilterChain);
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
verifyZeroInteractions(actionListener);
}
@SuppressWarnings("unchecked")
public void testApplyIndexRedirect() {
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
AtomicBoolean run = new AtomicBoolean(false);
IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler) {
assertTrue(run.compareAndSet(false, true));
assertTrue(node.isIngestNode());
assertEquals(action, IndexAction.NAME);
handler.handleResponse((T) new IndexResponse());
}
};
}
});
IndexRequest indexRequest = new IndexRequest().setPipeline("_id");
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verifyZeroInteractions(actionFilterChain);
assertTrue(run.get());
verify(actionListener).onResponse(any(IndexResponse.class));
verify(actionListener, never()).onFailure(any(TransportException.class));
}
@SuppressWarnings("unchecked")
public void testApplyBulkRedirect() {
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
AtomicBoolean run = new AtomicBoolean(false);
IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler) {
assertTrue(run.compareAndSet(false, true));
assertTrue(node.isIngestNode());
assertEquals(action, BulkAction.NAME);
handler.handleResponse((T) new BulkResponse(null, -1));
}
};
}
});
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest().setPipeline("_id"));
int numNoPipelineRequests = randomIntBetween(0, 10);
for (int i = 0; i < numNoPipelineRequests; i++) {
bulkRequest.add(new IndexRequest());
}
filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
verifyZeroInteractions(actionFilterChain);
verify(actionListener).onResponse(any(BulkResponse.class));
verify(actionListener, never()).onFailure(any(TransportException.class));
assertTrue(run.get());
}
@SuppressWarnings("unchecked")
public void testApplyFailures() {
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
int totalNodes = randomIntBetween(2, 5);
String requestAction;
ActionRequest request;
if (randomBoolean()) {
requestAction = IndexAction.NAME;
request = new IndexRequest().setPipeline("_id");
} else {
requestAction = BulkAction.NAME;
request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
}
AtomicBoolean run = new AtomicBoolean(false);
IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes,
new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler) {
assertTrue(run.compareAndSet(false, true));
assertTrue(node.isIngestNode());
assertEquals(action, requestAction);
handler.handleException(new TransportException(new IllegalArgumentException()));
}
};
}
});
filter.apply(task, requestAction, request, actionListener, actionFilterChain);
verifyZeroInteractions(actionFilterChain);
verify(actionListener).onFailure(any(TransportException.class));
verify(actionListener, never()).onResponse(any(TransportResponse.class));
assertTrue(run.get());
}
}

View File

@ -108,7 +108,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
public void testDynamicDisabled() { public void testDynamicDisabled() {
TransportIndexAction action = new TransportIndexAction(settings, transportService, clusterService, TransportIndexAction action = new TransportIndexAction(settings, transportService, clusterService,
indicesService, THREAD_POOL, shardStateAction, null, null, actionFilters, indexNameExpressionResolver, indicesService, null, THREAD_POOL, shardStateAction, null, null, actionFilters, indexNameExpressionResolver,
autoCreateIndex); autoCreateIndex);
IndexRequest request = new IndexRequest("index", "type", "1"); IndexRequest request = new IndexRequest("index", "type", "1");