Makes search action cancelable by task management API

Long running searches now can be cancelled using standard task cancellation mechanism.
This commit is contained in:
Igor Motov 2016-10-25 12:27:33 -10:00
parent c3761b8e4d
commit 17ad88d539
45 changed files with 1036 additions and 140 deletions

View File

@ -693,7 +693,9 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
ShardStateAction.NoLongerPrimaryShardException::new, 142),
SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 143),
NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144),
STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145);
STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145),
TASK_CANCELLED_EXCEPTION(org.elasticsearch.tasks.TaskCancelledException.class,
org.elasticsearch.tasks.TaskCancelledException::new, 146);
final Class<? extends ElasticsearchException> exceptionClass;
final FunctionThatThrowsIOException<StreamInput, ? extends ElasticsearchException> constructor;

View File

@ -59,6 +59,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected final SearchRequest request;
/** Used by subclasses to resolve node ids to DiscoveryNodes. **/
protected final Function<String, DiscoveryNode> nodeIdToDiscoveryNode;
protected final SearchTask task;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
protected final AtomicInteger successfulOps = new AtomicInteger();
@ -74,12 +75,13 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, AliasFilter> aliasFilter, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion) {
long clusterStateVersion, SearchTask task) {
super(startTime);
this.logger = logger;
this.searchTransportService = searchTransportService;
this.executor = executor;
this.request = request;
this.task = task;
this.listener = listener;
this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode;
this.clusterStateVersion = clusterStateVersion;

View File

@ -49,9 +49,9 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, AliasFilter> aliasFilter, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
queryFetchResults = new AtomicArray<>(firstResults.length());
}
@ -64,7 +64,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchTransportService.sendExecuteDfs(node, request, listener);
searchTransportService.sendExecuteDfs(node, request, task, listener);
}
@Override
@ -82,7 +82,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchTransportService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, querySearchRequest, task, new ActionListener<QueryFetchSearchResult>() {
@Override
public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());

View File

@ -57,9 +57,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, AliasFilter> aliasFilter, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length());
@ -74,7 +75,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchTransportService.sendExecuteDfs(node, request, listener);
searchTransportService.sendExecuteDfs(node, request, task, listener);
}
@Override
@ -91,7 +92,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
searchTransportService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
searchTransportService.sendExecuteQuery(node, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
@ -162,7 +163,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);

View File

@ -43,9 +43,10 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
Map<String, AliasFilter> aliasFilter,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
}
@ -58,7 +59,7 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QueryFetchSearchResult> listener) {
searchTransportService.sendExecuteFetch(node, request, listener);
searchTransportService.sendExecuteFetch(node, request, task, listener);
}
@Override

View File

@ -54,9 +54,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
AliasFilter> aliasFilter,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor, request, listener,
shardsIts, startTime, clusterStateVersion);
shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
@ -70,7 +71,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QuerySearchResultProvider> listener) {
searchTransportService.sendExecuteQuery(node, request, listener);
searchTransportService.sendExecuteQuery(node, request, task, listener);
}
@Override
@ -97,7 +98,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);

View File

@ -31,6 +31,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Arrays;
@ -275,6 +277,11 @@ public final class SearchRequest extends ActionRequest<SearchRequest> implements
return source != null && source.isSuggestOnly();
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -44,6 +44,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final SearchPhaseController searchPhaseController;
private final SearchTransportService searchTransportService;
private final SearchScrollRequest request;
private final SearchTask task;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final DiscoveryNodes nodes;
@ -52,13 +53,14 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final AtomicInteger successfulOps;
private final AtomicInteger counter;
SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task,
ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger;
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.request = request;
this.task = task;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
@ -128,7 +130,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchTransportService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, internalRequest, task, new ActionListener<ScrollQueryFetchSearchResult>() {
@Override
public void onResponse(ScrollQueryFetchSearchResult result) {
queryFetchResults.set(shardIndex, result.result());

View File

@ -44,6 +44,7 @@ import static org.elasticsearch.action.search.TransportSearchHelper.internalScro
class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private final Logger logger;
private final SearchTask task;
private final SearchTransportService searchTransportService;
private final SearchPhaseController searchPhaseController;
private final SearchScrollRequest request;
@ -56,13 +57,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private volatile ScoreDoc[] sortedShardDocs;
private final AtomicInteger successfulOps;
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task,
ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger;
this.searchTransportService = searchTransportService;
this.searchPhaseController = searchPhaseController;
this.request = request;
this.task = task;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
@ -124,7 +126,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchTransportService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
searchTransportService.sendExecuteQuery(node, internalRequest, task, new ActionListener<ScrollQuerySearchResult>() {
@Override
public void onResponse(ScrollQuerySearchResult result) {
queryResults.set(shardIndex, result.queryResult());
@ -184,7 +186,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget());

View File

@ -25,6 +25,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Objects;
@ -107,6 +109,11 @@ public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
out.writeOptionalWriteable(scroll);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
/**
* Task storing information about a currently running search request.
*/
public class SearchTask extends CancellableTask {
public SearchTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
}
}

View File

@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
@ -42,7 +44,10 @@ import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -77,22 +82,22 @@ public class SearchTransportService extends AbstractComponent {
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId),
new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
@Override
public void onResponse(SearchFreeContextResponse response) {
// no need to respond if it was freed or not
}
new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
@Override
public void onResponse(SearchFreeContextResponse response) {
// no need to respond if it was freed or not
}
@Override
public void onFailure(Exception e) {
@Override
public void onFailure(Exception e) {
}
}, SearchFreeContextResponse::new));
}
}, SearchFreeContextResponse::new));
}
public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener<SearchFreeContextResponse> listener) {
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
}
public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener<TransportResponse> listener) {
@ -100,59 +105,62 @@ public class SearchTransportService extends AbstractComponent {
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
}
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request,
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<DfsSearchResult> listener) {
transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
transportService.sendChildRequest(node, DFS_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request,
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<QuerySearchResultProvider> listener) {
transportService.sendRequest(node, QUERY_ACTION_NAME, request,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final ActionListener<QuerySearchResult> listener) {
transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request,
transportService.sendChildRequest(node, QUERY_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request,
public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, SearchTask task,
final ActionListener<QuerySearchResult> listener) {
transportService.sendChildRequest(node, QUERY_ID_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task,
final ActionListener<ScrollQuerySearchResult> listener) {
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request,
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
transportService.sendChildRequest(node, QUERY_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request,
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request,
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
transportService.sendChildRequest(node, QUERY_FETCH_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request,
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, SearchTask task,
final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request,
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
transportService.sendChildRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request,
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task,
final ActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request,
new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
transportService.sendChildRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request,
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, task, listener);
}
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request,
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener);
sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, task, listener);
}
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request,
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) {
transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
transportService.sendChildRequest(node, action, request, task,
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
}
static class ScrollFreeContextRequest extends TransportRequest {
@ -252,64 +260,103 @@ public class SearchTransportService extends AbstractComponent {
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
((request, channel) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}));
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
@Override
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
@Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
ThreadPool.Names.SAME, (request, channel) -> {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) throws Exception {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
});
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
DfsSearchResult result = searchService.executeDfsPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QuerySearchResultProvider result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
QuerySearchResultProvider result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
QueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
QueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
@Override
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -88,7 +89,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// pure paranoia if time goes backwards we are at least positive
final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
ClusterState clusterState = clusterService.state();
@ -129,12 +130,17 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
logger.debug("failed to optimize search type, continue as normal", e);
}
searchAsyncAction(searchRequest, shardIterators, startTimeInMillis, clusterState, Collections.unmodifiableMap(aliasFilter)
, listener).start();
searchAsyncAction((SearchTask)task, searchRequest, shardIterators, startTimeInMillis, clusterState,
Collections.unmodifiableMap(aliasFilter), listener).start();
}
private AbstractSearchAsyncAction searchAsyncAction(SearchRequest searchRequest, GroupShardsIterator shardIterators, long startTime,
ClusterState state, Map<String, AliasFilter> aliasFilter,
@Override
protected final void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator shardIterators,
long startTime, ClusterState state, Map<String, AliasFilter> aliasFilter,
ActionListener<SearchResponse> listener) {
final Function<String, DiscoveryNode> nodesLookup = state.nodes()::get;
final long clusterStateVersion = state.version();
@ -144,22 +150,22 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup,
aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
clusterStateVersion, task);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup,
aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
clusterStateVersion, task);
break;
case DFS_QUERY_AND_FETCH:
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup,
aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
clusterStateVersion, task);
break;
case QUERY_AND_FETCH:
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup,
aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
clusterStateVersion, task);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -52,19 +53,24 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService, clusterService);
}
@Override
protected void doExecute(SearchScrollRequest request, ActionListener<SearchResponse> listener) {
protected final void doExecute(SearchScrollRequest request, ActionListener<SearchResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
@Override
protected void doExecute(Task task, SearchScrollRequest request, ActionListener<SearchResponse> listener) {
try {
ParsedScrollId scrollId = parseScrollId(request.scrollId());
AbstractAsyncAction action;
switch (scrollId.getType()) {
case QUERY_THEN_FETCH_TYPE:
action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchTransportService,
searchPhaseController, request, scrollId, listener);
searchPhaseController, request, (SearchTask)task, scrollId, listener);
break;
case QUERY_AND_FETCH_TYPE:
action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchTransportService,
searchPhaseController, request, scrollId, listener);
searchPhaseController, request, (SearchTask)task, scrollId, listener);
break;
default:
throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized");

View File

@ -345,6 +345,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
SearchService.DEFAULT_KEEPALIVE_SETTING,
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
Node.WRITE_PORTS_FIELD_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_DATA_SETTING,

View File

@ -27,6 +27,7 @@ import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
@ -113,8 +114,11 @@ final class DefaultSearchContext extends SearchContext {
private Float minimumScore;
private boolean trackScores = false; // when sorting, track scores as well...
private FieldDoc searchAfter;
private boolean lowLevelCancellation;
// filter for sliced scroll
private SliceBuilder sliceBuilder;
private SearchTask task;
/**
* The original query as sent by the user without the types and aliases
@ -571,6 +575,15 @@ final class DefaultSearchContext extends SearchContext {
return this;
}
@Override
public boolean lowLevelCancellation() {
return lowLevelCancellation;
}
public void lowLevelCancellation(boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}
@Override
public FieldDoc searchAfter() {
return searchAfter;
@ -792,4 +805,19 @@ final class DefaultSearchContext extends SearchContext {
public void setProfilers(Profilers profilers) {
this.profilers = profilers;
}
@Override
public void setTask(SearchTask task) {
this.task = task;
}
@Override
public SearchTask getTask() {
return task;
}
@Override
public boolean isCancelled() {
return task.isCancelled();
}
}

View File

@ -26,6 +26,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -84,6 +85,7 @@ import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
@ -107,6 +109,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope);
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING =
Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope);
/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. However, since it will produce more cancellation checks it might slow the search performance
* down.
*/
public static final Setting<Boolean> LOW_LEVEL_CANCELLATION_SETTING =
Setting.boolSetting("search.low_level_cancellation", false, Property.Dynamic, Property.NodeScope);
public static final TimeValue NO_TIMEOUT = timeValueMillis(-1);
public static final Setting<TimeValue> DEFAULT_SEARCH_TIMEOUT_SETTING =
@ -133,6 +142,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private volatile TimeValue defaultSearchTimeout;
private volatile boolean lowLevelCancellation;
private final Cancellable keepAliveReaper;
private final AtomicLong idGenerator = new AtomicLong();
@ -160,12 +171,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout);
lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
}
private void setDefaultSearchTimeout(TimeValue defaultSearchTimeout) {
this.defaultSearchTimeout = defaultSearchTimeout;
}
private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}
@Override
public void afterIndexClosed(Index index, Settings indexSettings) {
// once an index is closed we can just clean up all the pending search context information
@ -212,10 +230,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
keepAliveReaper.cancel();
}
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws IOException {
public DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
context.incRef();
try {
context.setTask(task);
contextProcessing(context);
dfsPhase.execute(context);
contextProcessedSuccessfully(context);
@ -242,11 +261,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws IOException {
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
@ -276,11 +296,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) {
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) {
final SearchContext context = findContext(request.id());
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
@ -299,8 +320,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTask task) {
final SearchContext context = findContext(request.id());
context.setTask(task);
IndexShard indexShard = context.indexShard();
SearchOperationListener operationListener = indexShard.getSearchOperationListener();
context.incRef();
@ -339,11 +361,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws IOException {
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
context.incRef();
try {
contextProcessing(context);
context.setTask(task);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
@ -379,10 +402,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) {
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request, SearchTask task) {
final SearchContext context = findContext(request.id());
context.incRef();
try {
context.setTask(task);
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
@ -420,10 +444,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) {
public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) {
final SearchContext context = findContext(request.id());
context.incRef();
try {
context.setTask(task);
contextProcessing(context);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
processScroll(request, context);
@ -462,11 +487,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public FetchSearchResult executeFetchPhase(ShardFetchRequest request) {
public FetchSearchResult executeFetchPhase(ShardFetchRequest request, SearchTask task) {
final SearchContext context = findContext(request.id());
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
context.setTask(task);
contextProcessing(context);
if (request.lastEmittedDoc() != null) {
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
@ -546,6 +572,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
keepAlive = request.scroll().keepAlive().millis();
}
context.keepAlive(keepAlive);
context.lowLevelCancellation(lowLevelCancellation);
} catch (Exception e) {
context.close();
throw ExceptionsHelper.convertToRuntime(e);
@ -627,6 +654,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private void cleanContext(SearchContext context) {
try {
context.clearReleasables(Lifetime.PHASE);
context.setTask(null);
} finally {
context.decRef();
}

View File

@ -28,9 +28,11 @@ import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.search.SearchContextException;
import org.elasticsearch.search.SearchPhase;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.rescore.RescoreSearchContext;
import org.elasticsearch.tasks.TaskCancelledException;
import java.util.AbstractSet;
import java.util.Collection;
@ -59,6 +61,9 @@ public class DfsPhase implements SearchPhase {
TermStatistics[] termStatistics = new TermStatistics[terms.length];
IndexReaderContext indexReaderContext = context.searcher().getTopReaderContext();
for (int i = 0; i < terms.length; i++) {
if(context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
// LUCENE 4 UPGRADE: cache TermContext?
TermContext termContext = TermContext.build(indexReaderContext, terms[i]);
termStatistics[i] = context.searcher().termStatistics(terms[i], termContext);
@ -70,6 +75,9 @@ public class DfsPhase implements SearchPhase {
if (!fieldStatistics.containsKey(term.field())) {
final CollectionStatistics collectionStatistics = context.searcher().collectionStatistics(term.field());
fieldStatistics.put(term.field(), collectionStatistics);
if(context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
}
}

View File

@ -31,7 +31,11 @@ public class DfsPhaseExecutionException extends SearchContextException {
super(context, "Dfs Failed [" + msg + "]", t);
}
public DfsPhaseExecutionException(SearchContext context, String msg) {
super(context, "Dfs Failed [" + msg + "]");
}
public DfsPhaseExecutionException(StreamInput in) throws IOException {
super(in);
}
}
}

View File

@ -51,6 +51,7 @@ import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.tasks.TaskCancelledException;
import java.io.IOException;
import java.util.ArrayList;
@ -136,6 +137,9 @@ public class FetchPhase implements SearchPhase {
InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoadSize()];
FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
if(context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);

View File

@ -31,6 +31,10 @@ public class FetchPhaseExecutionException extends SearchContextException {
super(context, "Fetch Failed [" + msg + "]", t);
}
public FetchPhaseExecutionException(SearchContext context, String msg) {
super(context, "Fetch Failed [" + msg + "]");
}
public FetchPhaseExecutionException(StreamInput in) throws IOException {
super(in);
}

View File

@ -22,9 +22,12 @@ package org.elasticsearch.search.fetch;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
@ -106,4 +109,9 @@ public class ShardFetchRequest extends TransportRequest {
Lucene.writeScoreDoc(out, lastEmittedDoc);
}
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.unit.TimeValue;
@ -293,6 +294,11 @@ public abstract class FilteredSearchContext extends SearchContext {
in.terminateAfter(terminateAfter);
}
@Override
public boolean lowLevelCancellation() {
return in.lowLevelCancellation();
}
@Override
public SearchContext minimumScore(float minimumScore) {
return in.minimumScore(minimumScore);
@ -516,4 +522,19 @@ public abstract class FilteredSearchContext extends SearchContext {
public QueryShardContext getQueryShardContext() {
return in.getQueryShardContext();
}
@Override
public void setTask(SearchTask task) {
in.setTask(task);
}
@Override
public SearchTask getTask() {
return in.getTask();
}
@Override
public boolean isCancelled() {
return in.isCancelled();
}
}

View File

@ -20,9 +20,12 @@
package org.elasticsearch.search.internal;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
@ -67,4 +70,9 @@ public class InternalScrollSearchRequest extends TransportRequest {
out.writeLong(id);
out.writeOptionalWriteable(scroll);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
@ -96,6 +97,12 @@ public abstract class SearchContext extends AbstractRefCounted implements Releas
return parseFieldMatcher;
}
public abstract void setTask(SearchTask task);
public abstract SearchTask getTask();
public abstract boolean isCancelled();
@Override
public final void close() {
if (closed.compareAndSet(false, true)) { // prevent double closing
@ -220,6 +227,14 @@ public abstract class SearchContext extends AbstractRefCounted implements Releas
public abstract void terminateAfter(int terminateAfter);
/**
* Indicates if the current index should perform frequent low level search cancellation check.
*
* Enabling low-level checks will make long running searches to react to the cancellation request faster. However,
* since it will produce more cancellation checks it might slow the search performance down.
*/
public abstract boolean lowLevelCancellation();
public abstract SearchContext minimumScore(float minimumScore);
public abstract Float minimumScore();

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.internal;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -33,6 +34,8 @@ import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
@ -158,4 +161,9 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public void rewrite(QueryShardContext context) throws IOException {
shardSearchLocalRequest.rewrite(context);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -45,6 +45,7 @@ public class CollectorResult implements ToXContent, Writeable {
public static final String REASON_SEARCH_MIN_SCORE = "search_min_score";
public static final String REASON_SEARCH_MULTI = "search_multi";
public static final String REASON_SEARCH_TIMEOUT = "search_timeout";
public static final String REASON_SEARCH_CANCELLED = "search_cancelled";
public static final String REASON_AGGREGATION = "aggregation";
public static final String REASON_AGGREGATION_GLOBAL = "aggregation_global";

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FilterCollector;
import org.apache.lucene.search.FilterLeafCollector;
import org.apache.lucene.search.LeafCollector;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.tasks.TaskCancelledException;
import java.io.IOException;
/**
* Collector that checks if the task it is executed under is cancelled.
*/
public class CancellableCollector extends FilterCollector {
private final Provider<Boolean> cancelled;
private final boolean leafLevel;
/**
* Constructor
* @param cancelled supplier of the cancellation flag, the supplier will be called for each segment if lowLevelCancellation is set
* to false and for each collected record if lowLevelCancellation is set to true. In other words this class assumes
* that the supplier is fast, with performance on the order of a volatile read.
* @param lowLevelCancellation true if collector should check for cancellation for each collected record, false if check should be
* performed only once per segment
* @param in wrapped collector
*/
public CancellableCollector(Provider<Boolean> cancelled, boolean lowLevelCancellation, Collector in) {
super(in);
this.cancelled = cancelled;
this.leafLevel = lowLevelCancellation;
}
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (cancelled.get()) {
throw new TaskCancelledException("cancelled");
}
if (leafLevel) {
return new CancellableLeafCollector(super.getLeafCollector(context));
} else {
return super.getLeafCollector(context);
}
}
private class CancellableLeafCollector extends FilterLeafCollector {
private CancellableLeafCollector(LeafCollector in) {
super(in);
}
@Override
public void collect(int doc) throws IOException {
if (cancelled.get()) {
throw new TaskCancelledException("cancelled");
}
super.collect(doc);
}
}
}

View File

@ -362,6 +362,15 @@ public class QueryPhase implements SearchPhase {
}
}
if (collector != null) {
final Collector child = collector;
collector = new CancellableCollector(searchContext.getTask()::isCancelled, searchContext.lowLevelCancellation(), collector);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_CANCELLED,
Collections.singletonList((InternalProfileCollector) child));
}
}
try {
if (collector != null) {
if (doProfile) {

View File

@ -22,10 +22,13 @@ package org.elasticsearch.search.query;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
@ -82,4 +85,9 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
dfs.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/**
* A generic exception that can be thrown by a task when it's cancelled by the task manager API
*/
public class TaskCancelledException extends ElasticsearchException {
public TaskCancelledException(String msg) {
super(msg);
}
public TaskCancelledException(StreamInput in) throws IOException{
super(in);
}
}

View File

@ -459,7 +459,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
if (cancellationReason == null) {
nodesWithChildTasks.add(nodeId);
} else {
throw new IllegalStateException("cannot register child task request, the task is already cancelled");
throw new TaskCancelledException("cannot register child task request, the task is already cancelled");
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
/**
* Transport request handlers that is using task context
*/
public abstract class TaskAwareTransportRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
@Override
public final void messageReceived(T request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required");
}
}

View File

@ -46,6 +46,8 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
@ -462,6 +464,27 @@ public class TransportService extends AbstractLifecycleComponent {
asyncSender.sendRequest(node, action, request, options, handler);
}
public <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
final TransportRequest request, final Task parentTask,
final TransportResponseHandler<T> handler) {
sendChildRequest(node, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
}
public <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
request.setParentTask(localNode.getId(), parentTask.getId());
try {
taskManager.registerChildTask(parentTask, node.getId());
sendRequest(node, action, request, options, handler);
} catch (TaskCancelledException ex) {
// The parent task is already cancelled - just fail the request
handler.handleException(new TransportException(ex));
}
}
private <T extends TransportResponse> void sendRequestInternal(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,

View File

@ -795,6 +795,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(143, org.elasticsearch.script.ScriptException.class);
ids.put(144, org.elasticsearch.cluster.NotMasterException.class);
ids.put(145, org.elasticsearch.ElasticsearchStatusException.class);
ids.put(146, org.elasticsearch.tasks.TaskCancelledException.class);
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
@ -168,7 +169,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
try {
awaitBusy(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled");
throw new TaskCancelledException("Cancelled");
}
return false;
});

View File

@ -85,7 +85,7 @@ public class SearchAsyncActionTests extends ESTestCase {
Map<String, DiscoveryNode> lookup = new HashMap<>();
lookup.put(primaryNode.getId(), primaryNode);
AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction<TestSearchPhaseResult>(logger, transportService, lookup::get,
Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0) {
Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) {
TestSearchResponse response = new TestSearchResponse();
@Override

View File

@ -0,0 +1,297 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.AbstractSearchScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.NativeScriptFactory;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE)
public class SearchCancellationIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(ScriptedBlockPlugin.class);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()).build();
}
private void indexTestData() {
for (int i = 0; i < 10; i++) {
// Make sure we have a few segments
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for(int j=0; j<10; j++) {
bulkRequestBuilder.add(client().prepareIndex("test", "type", Integer.toString(i*10 + j)).setSource("field", "value"));
}
assertNoFailures(bulkRequestBuilder.get());
}
}
private List<ScriptedBlockPlugin> initBlockFactory() {
List<ScriptedBlockPlugin> plugins = new ArrayList<>();
for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) {
plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class));
}
for (ScriptedBlockPlugin plugin : plugins) {
plugin.scriptedBlockFactory.reset();
plugin.scriptedBlockFactory.enableBlock();
}
return plugins;
}
private void awaitForBlock(List<ScriptedBlockPlugin> plugins) throws Exception {
int numberOfShards = getNumShards("test").numPrimaries;
assertBusy(() -> {
int numberOfBlockedPlugins = 0;
for (ScriptedBlockPlugin plugin : plugins) {
numberOfBlockedPlugins += plugin.scriptedBlockFactory.hits.get();
}
logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards);
assertThat(numberOfBlockedPlugins, greaterThan(0));
});
}
private void disableBlocks(List<ScriptedBlockPlugin> plugins) throws Exception {
for (ScriptedBlockPlugin plugin : plugins) {
plugin.scriptedBlockFactory.disableBlock();
}
}
private void cancelSearch(String action) {
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().setActions(action).get();
assertThat(listTasksResponse.getTasks(), hasSize(1));
TaskInfo searchTask = listTasksResponse.getTasks().get(0);
logger.info("Cancelling search");
CancelTasksResponse cancelTasksResponse = client().admin().cluster().prepareCancelTasks().setTaskId(searchTask.getTaskId()).get();
assertThat(cancelTasksResponse.getTasks(), hasSize(1));
assertThat(cancelTasksResponse.getTasks().get(0).getTaskId(), equalTo(searchTask.getTaskId()));
}
private SearchResponse ensureSearchWasCancelled(ListenableActionFuture<SearchResponse> searchResponse) {
try {
SearchResponse response = searchResponse.actionGet();
logger.info("Search response {}", response);
assertNotEquals("At least one shard should have failed", 0, response.getFailedShards());
return response;
} catch (SearchPhaseExecutionException ex) {
logger.info("All shards failed with", ex);
return null;
}
}
public void testCancellationDuringQueryPhase() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
logger.info("Executing search");
ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery(
scriptQuery(new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null)))
.execute();
awaitForBlock(plugins);
cancelSearch(SearchAction.NAME);
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
}
public void testCancellationDuringFetchPhase() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
logger.info("Executing search");
ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.addScriptField("test_field",
new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null)
).execute();
awaitForBlock(plugins);
cancelSearch(SearchAction.NAME);
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
}
public void testCancellationOfScrollSearches() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
logger.info("Executing search");
ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setScroll(TimeValue.timeValueSeconds(10))
.setSize(5)
.setQuery(
scriptQuery(new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null)))
.execute();
awaitForBlock(plugins);
cancelSearch(SearchAction.NAME);
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
}
public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
// Disable block so the first request would pass
disableBlocks(plugins);
logger.info("Executing search");
TimeValue keepAlive = TimeValue.timeValueSeconds(5);
SearchResponse searchResponse = client().prepareSearch("test")
.setScroll(keepAlive)
.setSize(2)
.setQuery(
scriptQuery(new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null)))
.get();
assertNotNull(searchResponse.getScrollId());
// Enable block so the second request would block
for (ScriptedBlockPlugin plugin : plugins) {
plugin.scriptedBlockFactory.reset();
plugin.scriptedBlockFactory.enableBlock();
}
String scrollId = searchResponse.getScrollId();
logger.info("Executing scroll with id {}", scrollId);
ListenableActionFuture<SearchResponse> scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(keepAlive).execute();
awaitForBlock(plugins);
cancelSearch(SearchScrollAction.NAME);
disableBlocks(plugins);
SearchResponse response = ensureSearchWasCancelled(scrollResponse);
if (response != null){
// The response didn't fail completely - update scroll id
scrollId = response.getScrollId();
}
logger.info("Cleaning scroll with id {}", scrollId);
client().prepareClearScroll().addScrollId(scrollId).get();
}
public static class ScriptedBlockPlugin extends Plugin implements ScriptPlugin {
private NativeTestScriptedBlockFactory scriptedBlockFactory;
public ScriptedBlockPlugin() {
scriptedBlockFactory = new NativeTestScriptedBlockFactory();
}
@Override
public List<NativeScriptFactory> getNativeScripts() {
return Collections.singletonList(scriptedBlockFactory);
}
}
private static class NativeTestScriptedBlockFactory implements NativeScriptFactory {
public static final String TEST_NATIVE_BLOCK_SCRIPT = "native_test_search_block_script";
private final AtomicInteger hits = new AtomicInteger();
private final AtomicBoolean shouldBlock = new AtomicBoolean(true);
public NativeTestScriptedBlockFactory() {
}
public void reset() {
hits.set(0);
}
public void disableBlock() {
shouldBlock.set(false);
}
public void enableBlock() {
shouldBlock.set(true);
}
@Override
public ExecutableScript newScript(Map<String, Object> params) {
return new NativeTestScriptedBlock();
}
@Override
public boolean needsScores() {
return false;
}
@Override
public String getName() {
return TEST_NATIVE_BLOCK_SCRIPT;
}
public class NativeTestScriptedBlock extends AbstractSearchScript {
@Override
public Object run() {
hits.incrementAndGet();
try {
awaitBusy(() -> shouldBlock.get() == false);
} catch (Exception e) {
throw new RuntimeException(e);
}
return true;
}
}
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.search.query.CancellableCollector;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class SearchCancellationTests extends ESTestCase {
static Directory dir;
static IndexReader reader;
@BeforeClass
public static void before() throws IOException {
dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
indexRandomDocuments(w, TestUtil.nextInt(random(), 2, 20));
w.flush();
indexRandomDocuments(w, TestUtil.nextInt(random(), 1, 20));
reader = w.getReader();
w.close();
}
private static void indexRandomDocuments(RandomIndexWriter w, int numDocs) throws IOException {
for (int i = 0; i < numDocs; ++i) {
final int numHoles = random().nextInt(5);
for (int j = 0; j < numHoles; ++j) {
w.addDocument(new Document());
}
Document doc = new Document();
doc.add(new StringField("foo", "bar", Field.Store.NO));
w.addDocument(doc);
}
}
@AfterClass
public static void after() throws IOException {
IOUtils.close(reader, dir);
dir = null;
reader = null;
}
public void testLowLevelCancellableCollector() throws IOException {
TotalHitCountCollector collector = new TotalHitCountCollector();
AtomicBoolean cancelled = new AtomicBoolean();
CancellableCollector cancellableCollector = new CancellableCollector(cancelled::get, true, collector);
final LeafCollector leafCollector = cancellableCollector.getLeafCollector(reader.leaves().get(0));
leafCollector.collect(0);
cancelled.set(true);
expectThrows(TaskCancelledException.class, () -> leafCollector.collect(1));
}
public void testCancellableCollector() throws IOException {
TotalHitCountCollector collector = new TotalHitCountCollector();
AtomicBoolean cancelled = new AtomicBoolean();
CancellableCollector cancellableCollector = new CancellableCollector(cancelled::get, false, collector);
final LeafCollector leafCollector = cancellableCollector.getLeafCollector(reader.leaves().get(0));
leafCollector.collect(0);
cancelled.set(true);
leafCollector.collect(1);
expectThrows(TaskCancelledException.class, () -> cancellableCollector.getLeafCollector(reader.leaves().get(1)));
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
@ -175,11 +176,12 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
try {
QuerySearchResultProvider querySearchResultProvider = service.executeQueryPhase(
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY)));
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY)),
new SearchTask(123L, "", "", "", null));
IntArrayList intCursors = new IntArrayList(1);
intCursors.add(0);
ShardFetchRequest req = new ShardFetchRequest(querySearchResultProvider.id(), intCursors, null /* not a scroll */);
service.executeFetchPhase(req);
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null));
} catch (AlreadyClosedException ex) {
throw ex;
} catch (IllegalStateException ex) {

View File

@ -40,6 +40,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestSearchContext;
@ -54,6 +55,7 @@ public class QueryPhaseTests extends ESTestCase {
TestSearchContext context = new TestSearchContext(null);
context.parsedQuery(new ParsedQuery(query));
context.setSize(0);
context.setTask(new SearchTask(123L, "", "", "", null));
IndexSearcher searcher = new IndexSearcher(reader);
final AtomicBoolean collected = new AtomicBoolean();
@ -123,6 +125,7 @@ public class QueryPhaseTests extends ESTestCase {
TestSearchContext context = new TestSearchContext(null);
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
context.setSize(0);
context.setTask(new SearchTask(123L, "", "", "", null));
final AtomicBoolean collected = new AtomicBoolean();
IndexSearcher contextSearcher = new IndexSearcher(new MultiReader()) {
@ -146,6 +149,7 @@ public class QueryPhaseTests extends ESTestCase {
TestSearchContext context = new TestSearchContext(null);
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
context.setSize(0);
context.setTask(new SearchTask(123L, "", "", "", null));
final AtomicBoolean collected = new AtomicBoolean();
IndexSearcher contextSearcher = new IndexSearcher(new MultiReader()) {

View File

@ -108,6 +108,7 @@ GET _cat/tasks
// CONSOLE
[float]
[[task-cancellation]]
=== Task Cancellation
If a long-running task supports cancellation, it can be cancelled by the following command:

View File

@ -92,6 +92,19 @@ timeout. The setting key is `search.default_search_timeout` and can be
set using the <<cluster-update-settings>> endpoints. Setting this value
to `-1` resets the global search timeout to no timeout.
[float]
[[global-search-cancellation]]
== Search Cancellation
Searches can be cancelled using standard <<task-cancellation,task cancellation>>
mechanism. By default, a running search only checks if it is cancelled or
not on segment boundaries, therefore the cancellation can be delayed by large
segments. The search cancellation responsiveness can be improved by setting
the dynamic cluster-level setting `search.low_level_cancellation` to `true`.
However, it comes with an additional overhead of more frequent cancellation
checks that can be noticeable on large fast running search queries. Changing this
setting only affects the searches that start after the change is made.
--
include::search/search.asciidoc[]

View File

@ -118,9 +118,16 @@ This will yield the following result:
"rewrite_time": 51443,
"collector": [
{
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time": "0.06989100000ms"
"name": "CancellableCollector",
"reason": "search_cancelled",
"time": "0.3043110000ms",
"children": [
{
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time": "0.03227300000ms"
}
]
}
]
}
@ -150,7 +157,8 @@ This will yield the following result:
// TESTRESPONSE[s/"build_scorer": 42602/"build_scorer": $body.profile.shards.0.searches.0.query.0.children.1.breakdown.build_scorer/]
// TESTRESPONSE[s/"create_weight": 89323/"create_weight": $body.profile.shards.0.searches.0.query.0.children.1.breakdown.create_weight/]
// TESTRESPONSE[s/"next_doc": 2852/"next_doc": $body.profile.shards.0.searches.0.query.0.children.1.breakdown.next_doc/]
// TESTRESPONSE[s/"time": "0.06989100000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/]
// TESTRESPONSE[s/"time": "0.3043110000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/]
// TESTRESPONSE[s/"time": "0.03227300000ms"/"time": $body.profile.shards.0.searches.0.collector.0.children.0.time/]
// Sorry for this mess....
<1> Search results are returned, but were omitted here for brevity
@ -390,21 +398,30 @@ Looking at the previous example:
[source,js]
--------------------------------------------------
"collector": [
{
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time": "0.06989100000ms"
}
{
"name": "CancellableCollector",
"reason": "search_cancelled",
"time": "0.3043110000ms",
"children": [
{
"name": "SimpleTopScoreDocCollector",
"reason": "search_top_hits",
"time": "0.03227300000ms"
}
]
}
]
--------------------------------------------------
// TESTRESPONSE[s/^/{\n"took": $body.took,\n"timed_out": $body.timed_out,\n"_shards": $body._shards,\n"hits": $body.hits,\n"profile": {\n"shards": [ {\n"id": "$body.profile.shards.0.id",\n"searches": [{\n"query": $body.profile.shards.0.searches.0.query,\n"rewrite_time": $body.profile.shards.0.searches.0.rewrite_time,/]
// TESTRESPONSE[s/]$/]}], "aggregations": []}]}}/]
// TESTRESPONSE[s/"time": "0.06989100000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/]
// TESTRESPONSE[s/"time": "0.3043110000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/]
// TESTRESPONSE[s/"time": "0.03227300000ms"/"time": $body.profile.shards.0.searches.0.collector.0.children.0.time/]
We see a single collector named `SimpleTopScoreDocCollector`. This is the default "scoring and sorting" Collector
used by Elasticsearch. The `"reason"` field attempts to give a plain english description of the class name. The
We see a single collector named `SimpleTopScoreDocCollector` wrapped into `CancellableCollector`. `SimpleTopScoreDocCollector` is the default "scoring and sorting"
`Collector` used by Elasticsearch. The `"reason"` field attempts to give a plain english description of the class name. The
`"time"` is similar to the time in the Query tree: a wall-clock time inclusive of all children. Similarly, `children` lists
all sub-collectors.
all sub-collectors. The `CancellableCollector` that wraps `SimpleTopScoreDocCollector` is used by elasticsearch to detect if the current
search was cancelled and stop collecting documents as soon as it occurs.
It should be noted that Collector times are **independent** from the Query times. They are calculated, combined
and normalized independently! Due to the nature of Lucene's execution, it is impossible to "merge" the times

View File

@ -22,6 +22,7 @@ import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.unit.TimeValue;
@ -80,6 +81,7 @@ public class TestSearchContext extends SearchContext {
ParsedQuery postFilter;
Query query;
Float minScore;
SearchTask task;
ContextIndexSearcher searcher;
int size;
@ -324,6 +326,11 @@ public class TestSearchContext extends SearchContext {
this.terminateAfter = terminateAfter;
}
@Override
public boolean lowLevelCancellation() {
return false;
}
@Override
public SearchContext minimumScore(float minimumScore) {
this.minScore = minimumScore;
@ -571,4 +578,18 @@ public class TestSearchContext extends SearchContext {
return queryShardContext;
}
@Override
public void setTask(SearchTask task) {
this.task = task;
}
@Override
public SearchTask getTask() {
return task;
}
@Override
public boolean isCancelled() {
return task.isCancelled();
}
}