Remove search operation threading option

Search operation threading is an option that is not really used, and current non default implementations are flawed. Handling it also creates quite the complexity in the search handling codebase...
This is a breaking change, but one that is actually a good one, since I haven't seen/heard anybody use it, and if its used, its problematic...
closes #6042
This commit is contained in:
Shay Banon 2014-05-05 11:04:37 +02:00
parent cea2d21c50
commit 7ce8306bc5
16 changed files with 74 additions and 647 deletions

View File

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
/**
* Controls the operation threading model for search operation that are performed
* locally on the executing node.
*
*
*/
public enum SearchOperationThreading {
/**
* No threads are used, all the local shards operations will be performed on the calling
* thread.
*/
NO_THREADS((byte) 0),
/**
* The local shards operations will be performed in serial manner on a single forked thread.
*/
SINGLE_THREAD((byte) 1),
/**
* Each local shard operation will execute on its own thread.
*/
THREAD_PER_SHARD((byte) 2);
private final byte id;
SearchOperationThreading(byte id) {
this.id = id;
}
public byte id() {
return this.id;
}
public static SearchOperationThreading fromId(byte id) {
if (id == 0) {
return NO_THREADS;
}
if (id == 1) {
return SINGLE_THREAD;
}
if (id == 2) {
return THREAD_PER_SHARD;
}
throw new ElasticsearchIllegalArgumentException("No type matching id [" + id + "]");
}
public static SearchOperationThreading fromString(String value, @Nullable SearchOperationThreading defaultValue) {
if (value == null) {
return defaultValue;
}
if ("no_threads".equals(value) || "noThreads".equals(value)) {
return NO_THREADS;
} else if ("single_thread".equals(value) || "singleThread".equals(value)) {
return SINGLE_THREAD;
} else if ("thread_per_shard".equals(value) || "threadPerShard".equals(value)) {
return THREAD_PER_SHARD;
}
throw new ElasticsearchIllegalArgumentException("No value for search operation threading matching [" + value + "]");
}
}

View File

@ -83,8 +83,6 @@ public class SearchRequest extends ActionRequest<SearchRequest> {
private String[] types = Strings.EMPTY_ARRAY;
private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD;
private IndicesOptions indicesOptions = IndicesOptions.strict();
public SearchRequest() {
@ -133,12 +131,6 @@ public class SearchRequest extends ActionRequest<SearchRequest> {
}
}
/**
* Internal.
*/
public void beforeLocalFork() {
}
/**
* Sets the indices the search will be executed on.
*/
@ -156,29 +148,6 @@ public class SearchRequest extends ActionRequest<SearchRequest> {
return this;
}
/**
* Controls the the search operation threading model.
*/
public SearchOperationThreading operationThreading() {
return this.operationThreading;
}
/**
* Controls the the search operation threading model.
*/
public SearchRequest operationThreading(SearchOperationThreading operationThreading) {
this.operationThreading = operationThreading;
return this;
}
/**
* Sets the string representation of the operation threading model. Can be one of
* "no_threads", "single_thread" and "thread_per_shard".
*/
public SearchRequest operationThreading(String operationThreading) {
return operationThreading(SearchOperationThreading.fromString(operationThreading, this.operationThreading));
}
public IndicesOptions indicesOptions() {
return indicesOptions;
}
@ -509,7 +478,9 @@ public class SearchRequest extends ActionRequest<SearchRequest> {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operationThreading = SearchOperationThreading.fromId(in.readByte());
if (in.getVersion().before(Version.V_1_2_0)) {
in.readByte(); // backward comp. for operation threading
}
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
@ -546,7 +517,9 @@ public class SearchRequest extends ActionRequest<SearchRequest> {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(operationThreading.id());
if (out.getVersion().before(Version.V_1_2_0)) {
out.writeByte((byte) 2); // operation threading
}
out.writeByte(searchType.id());
out.writeVInt(indices.length);

View File

@ -155,23 +155,6 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return this;
}
/**
* Controls the the search operation threading model.
*/
public SearchRequestBuilder setOperationThreading(SearchOperationThreading operationThreading) {
request.operationThreading(operationThreading);
return this;
}
/**
* Sets the string representation of the operation threading model. Can be one of
* "no_threads", "single_thread" and "thread_per_shard".
*/
public SearchRequestBuilder setOperationThreading(String operationThreading) {
request.operationThreading(operationThreading);
return this;
}
/**
* Specifies what type of requested indices to ignore and wildcard indices expressions.
*

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
@ -37,11 +38,8 @@ import static org.elasticsearch.search.Scroll.readScroll;
public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
private String scrollId;
private Scroll scroll;
private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD;
public SearchScrollRequest() {
}
@ -58,21 +56,6 @@ public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
return validationException;
}
/**
* Controls the the search operation threading model.
*/
public SearchOperationThreading operationThreading() {
return this.operationThreading;
}
/**
* Controls the the search operation threading model.
*/
public SearchScrollRequest operationThreading(SearchOperationThreading operationThreading) {
this.operationThreading = operationThreading;
return this;
}
/**
* The scroll id used to scroll the search.
*/
@ -117,7 +100,9 @@ public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operationThreading = SearchOperationThreading.fromId(in.readByte());
if (in.getVersion().before(Version.V_1_2_0)) {
in.readByte(); // backward comp. for operation threading
}
scrollId = in.readString();
if (in.readBoolean()) {
scroll = readScroll(in);
@ -127,7 +112,9 @@ public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(operationThreading.id());
if (out.getVersion().before(Version.V_1_2_0)) {
out.writeByte((byte) 2); // operation threading
}
out.writeString(scrollId);
if (scroll == null) {
out.writeBoolean(false);

View File

@ -39,14 +39,6 @@ public class SearchScrollRequestBuilder extends ActionRequestBuilder<SearchScrol
super((InternalClient) client, new SearchScrollRequest(scrollId));
}
/**
* Controls the the search operation threading model.
*/
public SearchScrollRequestBuilder setOperationThreading(SearchOperationThreading operationThreading) {
request.operationThreading(operationThreading);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -43,19 +43,12 @@ import static org.elasticsearch.action.search.SearchType.*;
public class TransportSearchAction extends TransportAction<SearchRequest, SearchResponse> {
private final ClusterService clusterService;
private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction;
private final TransportSearchQueryThenFetchAction queryThenFetchAction;
private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction;
private final TransportSearchQueryAndFetchAction queryAndFetchAction;
private final TransportSearchScanAction scanAction;
private final TransportSearchCountAction countAction;
private final boolean optimizeSingleShard;
@Inject
@ -128,10 +121,6 @@ public class TransportSearchAction extends TransportAction<SearchRequest, Search
public void messageReceived(SearchRequest request, final TransportChannel channel) throws Exception {
// no need for a threaded listener
request.listenerThreaded(false);
// we don't spawn, so if we get a request with no threading, change it to single threaded
if (request.operationThreading() == SearchOperationThreading.NO_THREADS) {
request.operationThreading(SearchOperationThreading.SINGLE_THREAD);
}
execute(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse result) {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
@ -82,58 +81,13 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
int localOperations = 0;
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
final DfsSearchResult dfsResult = entry.value;
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
});
} else {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
} catch (Throwable t) {
onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter);
}
}
}
}
}
}
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new SearchServiceListener<QueryFetchSearchResult>() {

View File

@ -22,7 +22,10 @@ package org.elasticsearch.action.search.type;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
@ -87,61 +90,14 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
int localOperations = 0;
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
final DfsSearchResult dfsResult = entry.value;
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
});
} else {
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
} catch (Throwable t) {
onQueryFailure(t, querySearchRequest, entry.index, dfsResult, counter);
}
}
}
}
}
}
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener<QuerySearchResult>() {
@Override
@ -196,60 +152,14 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
request, sortedShardList, firstResults.length()
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
int localOperations = 0;
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
final QuerySearchResult queryResult = queryResults.get(entry.index);
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
} catch (Throwable t) {
onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter);
}
}
}
}
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
@ -95,61 +94,14 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
request, sortedShardList, firstResults.length()
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
int localOperations = 0;
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
final QuerySearchResult queryResult = firstResults.get(entry.index);
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
} catch (Throwable t) {
docIdsToLoad.set(entry.index, null); // clear it, we didn't manage to do anything with it
onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter);
}
}
}
}
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
@ -37,7 +36,6 @@ import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -49,19 +47,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna
*/
public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportSearchScrollQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
public TransportSearchScrollQueryAndFetchAction(Settings settings, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
@ -128,7 +121,6 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
return;
}
int localOperations = 0;
Tuple<String, Long>[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
Tuple<String, Long> target = context[i];
@ -137,11 +129,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
useSlowScroll = true;
}
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executePhase(i, node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]");
@ -153,48 +141,6 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
Tuple<String, Long> target = context1[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(i, node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(shardIndex, node, target.v2());
}
});
} else {
executePhase(shardIndex, node, target.v2());
}
} catch (Throwable t) {
onPhaseFailure(t, target.v2(), shardIndex);
}
}
}
}
}
for (Tuple<String, Long> target : scrollId.getContext()) {
DiscoveryNode node = nodes.get(target.v1());
if (node == null) {

View File

@ -39,7 +39,6 @@ import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -51,19 +50,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna
*/
public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportSearchScrollQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
@ -134,7 +128,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length);
int localOperations = 0;
Tuple<String, Long>[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
Tuple<String, Long> target = context[i];
@ -143,11 +136,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
useSlowScroll = true;
}
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executeQueryPhase(i, counter, node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]");
@ -163,48 +152,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
Tuple<String, Long> target = context1[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executeQueryPhase(i, counter, node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQueryPhase(shardIndex, counter, node, target.v2());
}
});
} else {
executeQueryPhase(shardIndex, counter, node, target.v2());
}
} catch (Throwable t) {
onQueryPhaseFailure(shardIndex, counter, target.v2(), t);
}
}
}
}
}
}
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -36,7 +39,6 @@ import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.List;
@ -49,19 +51,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna
*/
public class TransportSearchScrollScanAction extends AbstractComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
public TransportSearchScrollScanAction(Settings settings, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
@ -127,17 +124,12 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
return;
}
int localOperations = 0;
Tuple<String, Long>[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
Tuple<String, Long> target = context[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executePhase(i, node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]");
@ -149,48 +141,6 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
Tuple<String, Long> target = context1[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(i, node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(shardIndex, node, target.v2());
}
});
} else {
executePhase(shardIndex, node, target.v2());
}
} catch (Throwable t) {
onPhaseFailure(t, target.v2(), shardIndex);
}
}
}
}
}
for (Tuple<String, Long> target : scrollId.getContext()) {
DiscoveryNode node = nodes.get(target.v1());
if (node == null) {

View File

@ -145,75 +145,17 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
return;
}
request.beforeStart();
// count the local operations, and perform the non local ones
int localOperations = 0;
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.firstOrNull();
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
// do the remote operation here, the localAsync flag is not relevant
performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull());
}
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
// we have local operations, perform them now
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
request.beforeLocalFork();
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull());
}
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
if (localAsync) {
request.beforeLocalFork();
}
shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final int fShardIndex = shardIndex;
ShardRouting first = shardIt.firstOrNull();
if (first != null) {
if (first.currentNodeId().equals(nodes.localNodeId())) {
final ShardRouting shard = shardIt.nextOrNull();
if (localAsync) {
try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
performFirstPhase(fShardIndex, shardIt, shard);
}
});
} catch (Throwable t) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t);
}
} else {
performFirstPhase(fShardIndex, shardIt, shard);
}
}
}
}
}
}
}
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
@ -305,12 +247,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
if (!lastShard) {
try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
performFirstPhase(shardIndex, shardIt, nextShard);
}
});
} catch (Throwable t1) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1);
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.rest.action.search;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
@ -72,14 +71,6 @@ public class RestSearchAction extends BaseRestHandler {
SearchRequest searchRequest;
searchRequest = RestSearchAction.parseSearchRequest(request);
searchRequest.listenerThreaded(false);
SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operation_threading"), null);
if (operationThreading != null) {
if (operationThreading == SearchOperationThreading.NO_THREADS) {
// since we don't spawn, don't allow no_threads, but change it to a single thread
operationThreading = SearchOperationThreading.SINGLE_THREAD;
}
searchRequest.operationThreading(operationThreading);
}
client.search(searchRequest, new RestToXContentListener<SearchResponse>(channel));
}

View File

@ -19,21 +19,19 @@
package org.elasticsearch.rest.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import org.elasticsearch.search.Scroll;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
@ -65,14 +63,6 @@ public class RestSearchScrollAction extends BaseRestHandler {
if (scroll != null) {
searchScrollRequest.scroll(new Scroll(parseTimeValue(scroll, null)));
}
SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operation_threading"), null);
if (operationThreading != null) {
if (operationThreading == SearchOperationThreading.NO_THREADS) {
// since we don't spawn, don't allow no_threads, but change it to a single thread
operationThreading = SearchOperationThreading.SINGLE_THREAD;
}
searchScrollRequest.operationThreading(operationThreading);
}
client.searchScroll(searchScrollRequest, new RestToXContentListener<SearchResponse>(channel));
}
}

View File

@ -68,37 +68,16 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
}
private static <T> void execute(Callable<? extends T> callable, SearchServiceListener<T> listener) {
// Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on
// these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure
T result = null;
Throwable error = null;
try {
result = callable.call();
} catch (Throwable t) {
error = t;
} finally {
if (result == null) {
assert error != null;
listener.onFailure(error);
} else {
assert error == null : error;
listener.onResult(result);
}
}
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterService clusterService;
private final SearchService searchService;
private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger);
@Inject
public SearchServiceTransportAction(Settings settings, TransportService transportService, ClusterService clusterService, SearchService searchService) {
public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterService = clusterService;
this.searchService = searchService;
@ -523,6 +502,35 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
}
private <T> void execute(final Callable<? extends T> callable, final SearchServiceListener<T> listener) {
try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
// Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on
// these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure
T result = null;
Throwable error = null;
try {
result = callable.call();
} catch (Throwable t) {
error = t;
} finally {
if (result == null) {
assert error != null;
listener.onFailure(error);
} else {
assert error == null : error;
listener.onResult(result);
}
}
}
});
} catch (Throwable t) {
listener.onFailure(t);
}
}
class SearchFreeContextRequest extends TransportRequest {
private long id;