From 7ce8306bc5bace5fc741b5e16932de6d39520ca1 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 5 May 2014 11:04:37 +0200 Subject: [PATCH] Remove search operation threading option Search operation threading is an option that is not really used, and current non default implementations are flawed. Handling it also creates quite the complexity in the search handling codebase... This is a breaking change, but one that is actually a good one, since I haven't seen/heard anybody use it, and if its used, its problematic... closes #6042 --- .../search/SearchOperationThreading.java | 82 -------------- .../action/search/SearchRequest.java | 39 +------ .../action/search/SearchRequestBuilder.java | 17 --- .../action/search/SearchScrollRequest.java | 27 ++--- .../search/SearchScrollRequestBuilder.java | 8 -- .../action/search/TransportSearchAction.java | 11 -- ...TransportSearchDfsQueryAndFetchAction.java | 50 +-------- ...ransportSearchDfsQueryThenFetchAction.java | 106 ++---------------- .../TransportSearchQueryThenFetchAction.java | 52 +-------- ...nsportSearchScrollQueryAndFetchAction.java | 58 +--------- ...sportSearchScrollQueryThenFetchAction.java | 57 +--------- .../type/TransportSearchScrollScanAction.java | 62 +--------- .../type/TransportSearchTypeAction.java | 69 +----------- .../rest/action/search/RestSearchAction.java | 9 -- .../action/search/RestSearchScrollAction.java | 18 +-- .../action/SearchServiceTransportAction.java | 56 +++++---- 16 files changed, 74 insertions(+), 647 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java diff --git a/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java b/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java deleted file mode 100644 index 23380384040..00000000000 --- a/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search; - -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.common.Nullable; - -/** - * Controls the operation threading model for search operation that are performed - * locally on the executing node. - * - * - */ -public enum SearchOperationThreading { - /** - * No threads are used, all the local shards operations will be performed on the calling - * thread. - */ - NO_THREADS((byte) 0), - /** - * The local shards operations will be performed in serial manner on a single forked thread. - */ - SINGLE_THREAD((byte) 1), - /** - * Each local shard operation will execute on its own thread. - */ - THREAD_PER_SHARD((byte) 2); - - private final byte id; - - SearchOperationThreading(byte id) { - this.id = id; - } - - public byte id() { - return this.id; - } - - public static SearchOperationThreading fromId(byte id) { - if (id == 0) { - return NO_THREADS; - } - if (id == 1) { - return SINGLE_THREAD; - } - if (id == 2) { - return THREAD_PER_SHARD; - } - throw new ElasticsearchIllegalArgumentException("No type matching id [" + id + "]"); - } - - public static SearchOperationThreading fromString(String value, @Nullable SearchOperationThreading defaultValue) { - if (value == null) { - return defaultValue; - } - if ("no_threads".equals(value) || "noThreads".equals(value)) { - return NO_THREADS; - } else if ("single_thread".equals(value) || "singleThread".equals(value)) { - return SINGLE_THREAD; - } else if ("thread_per_shard".equals(value) || "threadPerShard".equals(value)) { - return THREAD_PER_SHARD; - } - throw new ElasticsearchIllegalArgumentException("No value for search operation threading matching [" + value + "]"); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 20efe76c79b..7929cdac5cd 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -83,8 +83,6 @@ public class SearchRequest extends ActionRequest { private String[] types = Strings.EMPTY_ARRAY; - private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD; - private IndicesOptions indicesOptions = IndicesOptions.strict(); public SearchRequest() { @@ -133,12 +131,6 @@ public class SearchRequest extends ActionRequest { } } - /** - * Internal. - */ - public void beforeLocalFork() { - } - /** * Sets the indices the search will be executed on. */ @@ -156,29 +148,6 @@ public class SearchRequest extends ActionRequest { return this; } - /** - * Controls the the search operation threading model. - */ - public SearchOperationThreading operationThreading() { - return this.operationThreading; - } - - /** - * Controls the the search operation threading model. - */ - public SearchRequest operationThreading(SearchOperationThreading operationThreading) { - this.operationThreading = operationThreading; - return this; - } - - /** - * Sets the string representation of the operation threading model. Can be one of - * "no_threads", "single_thread" and "thread_per_shard". - */ - public SearchRequest operationThreading(String operationThreading) { - return operationThreading(SearchOperationThreading.fromString(operationThreading, this.operationThreading)); - } - public IndicesOptions indicesOptions() { return indicesOptions; } @@ -509,7 +478,9 @@ public class SearchRequest extends ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - operationThreading = SearchOperationThreading.fromId(in.readByte()); + if (in.getVersion().before(Version.V_1_2_0)) { + in.readByte(); // backward comp. for operation threading + } searchType = SearchType.fromId(in.readByte()); indices = new String[in.readVInt()]; @@ -546,7 +517,9 @@ public class SearchRequest extends ActionRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(operationThreading.id()); + if (out.getVersion().before(Version.V_1_2_0)) { + out.writeByte((byte) 2); // operation threading + } out.writeByte(searchType.id()); out.writeVInt(indices.length); diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 4a316cefb7b..25a25439ed5 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -155,23 +155,6 @@ public class SearchRequestBuilder extends ActionRequestBuilder { private String scrollId; - private Scroll scroll; - private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD; - public SearchScrollRequest() { } @@ -58,21 +56,6 @@ public class SearchScrollRequest extends ActionRequest { return validationException; } - /** - * Controls the the search operation threading model. - */ - public SearchOperationThreading operationThreading() { - return this.operationThreading; - } - - /** - * Controls the the search operation threading model. - */ - public SearchScrollRequest operationThreading(SearchOperationThreading operationThreading) { - this.operationThreading = operationThreading; - return this; - } - /** * The scroll id used to scroll the search. */ @@ -117,7 +100,9 @@ public class SearchScrollRequest extends ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - operationThreading = SearchOperationThreading.fromId(in.readByte()); + if (in.getVersion().before(Version.V_1_2_0)) { + in.readByte(); // backward comp. for operation threading + } scrollId = in.readString(); if (in.readBoolean()) { scroll = readScroll(in); @@ -127,7 +112,9 @@ public class SearchScrollRequest extends ActionRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(operationThreading.id()); + if (out.getVersion().before(Version.V_1_2_0)) { + out.writeByte((byte) 2); // operation threading + } out.writeString(scrollId); if (scroll == null) { out.writeBoolean(false); diff --git a/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java index fa75149a0a9..ab05c95d3bb 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java @@ -39,14 +39,6 @@ public class SearchScrollRequestBuilder extends ActionRequestBuilder { private final ClusterService clusterService; - private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction; - private final TransportSearchQueryThenFetchAction queryThenFetchAction; - private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction; - private final TransportSearchQueryAndFetchAction queryAndFetchAction; - private final TransportSearchScanAction scanAction; - private final TransportSearchCountAction countAction; - private final boolean optimizeSingleShard; @Inject @@ -128,10 +121,6 @@ public class TransportSearchAction extends TransportAction() { @Override public void onResponse(SearchResponse result) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index d29355d851f..5ea504c8d19 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; @@ -82,56 +81,11 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - int localOperations = 0; for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : firstResults.asList()) { - final DfsSearchResult dfsResult = entry.value; - final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - }); - } else { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } catch (Throwable t) { - onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter); - } - } - } - } + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 802333338da..2ceb7e363bb 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -22,7 +22,10 @@ package org.elasticsearch.action.search.type; import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -87,58 +90,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA protected void moveToSecondPhase() { final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - - int localOperations = 0; for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : firstResults.asList()) { - final DfsSearchResult dfsResult = entry.value; - final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - }); - } else { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } catch (Throwable t) { - onQueryFailure(t, querySearchRequest, entry.index, dfsResult, counter); - } - } - } - } + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } } @@ -196,57 +152,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - int localOperations = 0; for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = queryResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - final QuerySearchResult queryResult = queryResults.get(entry.index); - final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } catch (Throwable t) { - onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); - } - } - } - } + FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index a122cad2af1..b4ddd623f8a 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; @@ -95,58 +94,11 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - int localOperations = 0; for (AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = firstResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - final QuerySearchResult queryResult = firstResults.get(entry.index); - final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } catch (Throwable t) { - docIdsToLoad.set(entry.index, null); // clear it, we didn't manage to do anything with it - onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); - } - } - } - } + FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index d22a2ac3908..7cdc3eccd95 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.search.type; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; @@ -37,7 +36,6 @@ import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -49,19 +47,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna */ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollQueryAndFetchAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -128,7 +121,6 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent return; } - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; @@ -137,11 +129,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) { useSlowScroll = true; } - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executePhase(i, node, target.v2()); - } + executePhase(i, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -153,48 +141,6 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(i, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); - } - } catch (Throwable t) { - onPhaseFailure(t, target.v2(), shardIndex); - } - } - } - } - } - for (Tuple target : scrollId.getContext()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 473725f8c21..91a76d4901b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -39,7 +39,6 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -51,19 +50,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna */ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -134,7 +128,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; @@ -143,11 +136,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) { useSlowScroll = true; } - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executeQueryPhase(i, counter, node, target.v2()); - } + executeQueryPhase(i, counter, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -163,48 +152,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } } } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executeQueryPhase(i, counter, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQueryPhase(shardIndex, counter, node, target.v2()); - } - }); - } else { - executeQueryPhase(shardIndex, counter, node, target.v2()); - } - } catch (Throwable t) { - onQueryPhaseFailure(shardIndex, counter, target.v2(), t); - } - } - } - } - } } private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index eeb456a14ca..915bc61dbe2 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -21,7 +21,10 @@ package org.elasticsearch.action.search.type; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -36,7 +39,6 @@ import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.List; @@ -49,19 +51,14 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.interna */ public class TransportSearchScrollScanAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollScanAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -127,17 +124,12 @@ public class TransportSearchScrollScanAction extends AbstractComponent { return; } - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null) { - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executePhase(i, node, target.v2()); - } + executePhase(i, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -149,48 +141,6 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(i, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); - } - } catch (Throwable t) { - onPhaseFailure(t, target.v2(), shardIndex); - } - } - } - } - } - for (Tuple target : scrollId.getContext()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 3c9aa834fff..3101305d5ab 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -145,75 +145,17 @@ public abstract class TransportSearchTypeAction extends TransportAction 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - request.beforeLocalFork(); - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - int shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final ShardRouting shard = shardIt.firstOrNull(); - if (shard != null) { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull()); - } - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - if (localAsync) { - request.beforeLocalFork(); - } - shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final int fShardIndex = shardIndex; - ShardRouting first = shardIt.firstOrNull(); - if (first != null) { - if (first.currentNodeId().equals(nodes.localNodeId())) { - final ShardRouting shard = shardIt.nextOrNull(); - if (localAsync) { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - performFirstPhase(fShardIndex, shardIt, shard); - } - }); - } catch (Throwable t) { - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t); - } - } else { - performFirstPhase(fShardIndex, shardIt, shard); - } - } - } - } - } - } } void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { @@ -305,12 +247,7 @@ public abstract class TransportSearchTypeAction extends TransportAction(channel)); } diff --git a/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java b/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java index 0c41da18357..8d2e001e6b9 100644 --- a/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java +++ b/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java @@ -19,21 +19,19 @@ package org.elasticsearch.rest.action.search; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.rest.*; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestToXContentListener; import org.elasticsearch.search.Scroll; -import java.io.IOException; - import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -65,14 +63,6 @@ public class RestSearchScrollAction extends BaseRestHandler { if (scroll != null) { searchScrollRequest.scroll(new Scroll(parseTimeValue(scroll, null))); } - SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operation_threading"), null); - if (operationThreading != null) { - if (operationThreading == SearchOperationThreading.NO_THREADS) { - // since we don't spawn, don't allow no_threads, but change it to a single thread - operationThreading = SearchOperationThreading.SINGLE_THREAD; - } - searchScrollRequest.operationThreading(operationThreading); - } client.searchScroll(searchScrollRequest, new RestToXContentListener(channel)); } } diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 890c9f406e4..19fdd8ffd11 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -68,37 +68,16 @@ public class SearchServiceTransportAction extends AbstractComponent { } } - private static void execute(Callable callable, SearchServiceListener listener) { - // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on - // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure - T result = null; - Throwable error = null; - try { - result = callable.call(); - } catch (Throwable t) { - error = t; - } finally { - if (result == null) { - assert error != null; - listener.onFailure(error); - } else { - assert error == null : error; - listener.onResult(result); - } - } - } - + private final ThreadPool threadPool; private final TransportService transportService; - private final ClusterService clusterService; - private final SearchService searchService; - private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger); @Inject - public SearchServiceTransportAction(Settings settings, TransportService transportService, ClusterService clusterService, SearchService searchService) { + public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) { super(settings); + this.threadPool = threadPool; this.transportService = transportService; this.clusterService = clusterService; this.searchService = searchService; @@ -523,6 +502,35 @@ public class SearchServiceTransportAction extends AbstractComponent { } } + private void execute(final Callable callable, final SearchServiceListener listener) { + try { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on + // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure + T result = null; + Throwable error = null; + try { + result = callable.call(); + } catch (Throwable t) { + error = t; + } finally { + if (result == null) { + assert error != null; + listener.onFailure(error); + } else { + assert error == null : error; + listener.onResult(result); + } + } + } + }); + } catch (Throwable t) { + listener.onFailure(t); + } + } + class SearchFreeContextRequest extends TransportRequest { private long id;