diff --git a/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java b/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java deleted file mode 100644 index 23380384040..00000000000 --- a/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search; - -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.common.Nullable; - -/** - * Controls the operation threading model for search operation that are performed - * locally on the executing node. - * - * - */ -public enum SearchOperationThreading { - /** - * No threads are used, all the local shards operations will be performed on the calling - * thread. - */ - NO_THREADS((byte) 0), - /** - * The local shards operations will be performed in serial manner on a single forked thread. - */ - SINGLE_THREAD((byte) 1), - /** - * Each local shard operation will execute on its own thread. - */ - THREAD_PER_SHARD((byte) 2); - - private final byte id; - - SearchOperationThreading(byte id) { - this.id = id; - } - - public byte id() { - return this.id; - } - - public static SearchOperationThreading fromId(byte id) { - if (id == 0) { - return NO_THREADS; - } - if (id == 1) { - return SINGLE_THREAD; - } - if (id == 2) { - return THREAD_PER_SHARD; - } - throw new ElasticsearchIllegalArgumentException("No type matching id [" + id + "]"); - } - - public static SearchOperationThreading fromString(String value, @Nullable SearchOperationThreading defaultValue) { - if (value == null) { - return defaultValue; - } - if ("no_threads".equals(value) || "noThreads".equals(value)) { - return NO_THREADS; - } else if ("single_thread".equals(value) || "singleThread".equals(value)) { - return SINGLE_THREAD; - } else if ("thread_per_shard".equals(value) || "threadPerShard".equals(value)) { - return THREAD_PER_SHARD; - } - throw new ElasticsearchIllegalArgumentException("No value for search operation threading matching [" + value + "]"); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 20efe76c79b..7929cdac5cd 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -83,8 +83,6 @@ public class SearchRequest extends ActionRequest { private String[] types = Strings.EMPTY_ARRAY; - private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD; - private IndicesOptions indicesOptions = IndicesOptions.strict(); public SearchRequest() { @@ -133,12 +131,6 @@ public class SearchRequest extends ActionRequest { } } - /** - * Internal. - */ - public void beforeLocalFork() { - } - /** * Sets the indices the search will be executed on. */ @@ -156,29 +148,6 @@ public class SearchRequest extends ActionRequest { return this; } - /** - * Controls the the search operation threading model. - */ - public SearchOperationThreading operationThreading() { - return this.operationThreading; - } - - /** - * Controls the the search operation threading model. - */ - public SearchRequest operationThreading(SearchOperationThreading operationThreading) { - this.operationThreading = operationThreading; - return this; - } - - /** - * Sets the string representation of the operation threading model. Can be one of - * "no_threads", "single_thread" and "thread_per_shard". - */ - public SearchRequest operationThreading(String operationThreading) { - return operationThreading(SearchOperationThreading.fromString(operationThreading, this.operationThreading)); - } - public IndicesOptions indicesOptions() { return indicesOptions; } @@ -509,7 +478,9 @@ public class SearchRequest extends ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - operationThreading = SearchOperationThreading.fromId(in.readByte()); + if (in.getVersion().before(Version.V_1_2_0)) { + in.readByte(); // backward comp. for operation threading + } searchType = SearchType.fromId(in.readByte()); indices = new String[in.readVInt()]; @@ -546,7 +517,9 @@ public class SearchRequest extends ActionRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(operationThreading.id()); + if (out.getVersion().before(Version.V_1_2_0)) { + out.writeByte((byte) 2); // operation threading + } out.writeByte(searchType.id()); out.writeVInt(indices.length); diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 4a316cefb7b..25a25439ed5 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -155,23 +155,6 @@ public class SearchRequestBuilder extends ActionRequestBuilder { private String scrollId; - private Scroll scroll; - private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD; - public SearchScrollRequest() { } @@ -58,21 +56,6 @@ public class SearchScrollRequest extends ActionRequest { return validationException; } - /** - * Controls the the search operation threading model. - */ - public SearchOperationThreading operationThreading() { - return this.operationThreading; - } - - /** - * Controls the the search operation threading model. - */ - public SearchScrollRequest operationThreading(SearchOperationThreading operationThreading) { - this.operationThreading = operationThreading; - return this; - } - /** * The scroll id used to scroll the search. */ @@ -117,7 +100,9 @@ public class SearchScrollRequest extends ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - operationThreading = SearchOperationThreading.fromId(in.readByte()); + if (in.getVersion().before(Version.V_1_2_0)) { + in.readByte(); // backward comp. for operation threading + } scrollId = in.readString(); if (in.readBoolean()) { scroll = readScroll(in); @@ -127,7 +112,9 @@ public class SearchScrollRequest extends ActionRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(operationThreading.id()); + if (out.getVersion().before(Version.V_1_2_0)) { + out.writeByte((byte) 2); // operation threading + } out.writeString(scrollId); if (scroll == null) { out.writeBoolean(false); diff --git a/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java index fa75149a0a9..ab05c95d3bb 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java @@ -39,14 +39,6 @@ public class SearchScrollRequestBuilder extends ActionRequestBuilder { private final ClusterService clusterService; - private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction; - private final TransportSearchQueryThenFetchAction queryThenFetchAction; - private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction; - private final TransportSearchQueryAndFetchAction queryAndFetchAction; - private final TransportSearchScanAction scanAction; - private final TransportSearchCountAction countAction; - private final boolean optimizeSingleShard; @Inject @@ -128,10 +121,6 @@ public class TransportSearchAction extends TransportAction() { @Override public void onResponse(SearchResponse result) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index d29355d851f..5ea504c8d19 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; @@ -82,56 +81,11 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - int localOperations = 0; for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : firstResults.asList()) { - final DfsSearchResult dfsResult = entry.value; - final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - }); - } else { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } catch (Throwable t) { - onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter); - } - } - } - } + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 802333338da..2ceb7e363bb 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -22,7 +22,10 @@ package org.elasticsearch.action.search.type; import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -87,58 +90,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA protected void moveToSecondPhase() { final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - - int localOperations = 0; for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : firstResults.asList()) { - final DfsSearchResult dfsResult = entry.value; - final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - }); - } else { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } catch (Throwable t) { - onQueryFailure(t, querySearchRequest, entry.index, dfsResult, counter); - } - } - } - } + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } } @@ -196,57 +152,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - int localOperations = 0; for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = queryResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - final QuerySearchResult queryResult = queryResults.get(entry.index); - final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } catch (Throwable t) { - onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); - } - } - } - } + FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index a122cad2af1..b4ddd623f8a 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; @@ -95,58 +94,11 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - int localOperations = 0; for (AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = firstResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - final QuerySearchResult queryResult = firstResults.get(entry.index); - final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } catch (Throwable t) { - docIdsToLoad.set(entry.index, null); // clear it, we didn't manage to do anything with it - onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); - } - } - } - } + FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index d22a2ac3908..7cdc3eccd95 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.search.type; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; @@ -37,7 +36,6 @@ import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -49,19 +47,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna */ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollQueryAndFetchAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -128,7 +121,6 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent return; } - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; @@ -137,11 +129,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) { useSlowScroll = true; } - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executePhase(i, node, target.v2()); - } + executePhase(i, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -153,48 +141,6 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(i, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); - } - } catch (Throwable t) { - onPhaseFailure(t, target.v2(), shardIndex); - } - } - } - } - } - for (Tuple target : scrollId.getContext()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 473725f8c21..91a76d4901b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -39,7 +39,6 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -51,19 +50,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna */ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -134,7 +128,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; @@ -143,11 +136,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) { useSlowScroll = true; } - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executeQueryPhase(i, counter, node, target.v2()); - } + executeQueryPhase(i, counter, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -163,48 +152,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } } } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executeQueryPhase(i, counter, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQueryPhase(shardIndex, counter, node, target.v2()); - } - }); - } else { - executeQueryPhase(shardIndex, counter, node, target.v2()); - } - } catch (Throwable t) { - onQueryPhaseFailure(shardIndex, counter, target.v2(), t); - } - } - } - } - } } private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index eeb456a14ca..915bc61dbe2 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -21,7 +21,10 @@ package org.elasticsearch.action.search.type; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -36,7 +39,6 @@ import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.List; @@ -49,19 +51,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna */ public class TransportSearchScrollScanAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollScanAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -127,17 +124,12 @@ public class TransportSearchScrollScanAction extends AbstractComponent { return; } - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null) { - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executePhase(i, node, target.v2()); - } + executePhase(i, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -149,48 +141,6 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(i, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); - } - } catch (Throwable t) { - onPhaseFailure(t, target.v2(), shardIndex); - } - } - } - } - } - for (Tuple target : scrollId.getContext()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 3c9aa834fff..3101305d5ab 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -145,75 +145,17 @@ public abstract class TransportSearchTypeAction extends TransportAction 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - request.beforeLocalFork(); - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - int shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final ShardRouting shard = shardIt.firstOrNull(); - if (shard != null) { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull()); - } - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - if (localAsync) { - request.beforeLocalFork(); - } - shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final int fShardIndex = shardIndex; - ShardRouting first = shardIt.firstOrNull(); - if (first != null) { - if (first.currentNodeId().equals(nodes.localNodeId())) { - final ShardRouting shard = shardIt.nextOrNull(); - if (localAsync) { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - performFirstPhase(fShardIndex, shardIt, shard); - } - }); - } catch (Throwable t) { - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t); - } - } else { - performFirstPhase(fShardIndex, shardIt, shard); - } - } - } - } - } - } } void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { @@ -305,12 +247,7 @@ public abstract class TransportSearchTypeAction extends TransportAction(channel)); } diff --git a/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java b/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java index 0c41da18357..8d2e001e6b9 100644 --- a/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java +++ b/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java @@ -19,21 +19,19 @@ package org.elasticsearch.rest.action.search; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.rest.*; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestToXContentListener; import org.elasticsearch.search.Scroll; -import java.io.IOException; - import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -65,14 +63,6 @@ public class RestSearchScrollAction extends BaseRestHandler { if (scroll != null) { searchScrollRequest.scroll(new Scroll(parseTimeValue(scroll, null))); } - SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operation_threading"), null); - if (operationThreading != null) { - if (operationThreading == SearchOperationThreading.NO_THREADS) { - // since we don't spawn, don't allow no_threads, but change it to a single thread - operationThreading = SearchOperationThreading.SINGLE_THREAD; - } - searchScrollRequest.operationThreading(operationThreading); - } client.searchScroll(searchScrollRequest, new RestToXContentListener(channel)); } } diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 890c9f406e4..19fdd8ffd11 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -68,37 +68,16 @@ public class SearchServiceTransportAction extends AbstractComponent { } } - private static void execute(Callable callable, SearchServiceListener listener) { - // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on - // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure - T result = null; - Throwable error = null; - try { - result = callable.call(); - } catch (Throwable t) { - error = t; - } finally { - if (result == null) { - assert error != null; - listener.onFailure(error); - } else { - assert error == null : error; - listener.onResult(result); - } - } - } - + private final ThreadPool threadPool; private final TransportService transportService; - private final ClusterService clusterService; - private final SearchService searchService; - private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger); @Inject - public SearchServiceTransportAction(Settings settings, TransportService transportService, ClusterService clusterService, SearchService searchService) { + public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) { super(settings); + this.threadPool = threadPool; this.transportService = transportService; this.clusterService = clusterService; this.searchService = searchService; @@ -523,6 +502,35 @@ public class SearchServiceTransportAction extends AbstractComponent { } } + private void execute(final Callable callable, final SearchServiceListener listener) { + try { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on + // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure + T result = null; + Throwable error = null; + try { + result = callable.call(); + } catch (Throwable t) { + error = t; + } finally { + if (result == null) { + assert error != null; + listener.onFailure(error); + } else { + assert error == null : error; + listener.onResult(result); + } + } + } + }); + } catch (Throwable t) { + listener.onFailure(t); + } + } + class SearchFreeContextRequest extends TransportRequest { private long id;