diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index e9a9813386d..8c648b9ad57 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -231,7 +231,6 @@ public class ActionModule extends AbstractModule { registerAction(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class, TransportIndexDeleteByQueryAction.class, TransportShardDeleteByQueryAction.class); registerAction(SearchAction.INSTANCE, TransportSearchAction.class, - TransportSearchCache.class, TransportSearchDfsQueryThenFetchAction.class, TransportSearchQueryThenFetchAction.class, TransportSearchDfsQueryAndFetchAction.class, diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java deleted file mode 100644 index 7b0c459fe18..00000000000 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; - -import java.util.Collection; -import java.util.Map; -import java.util.Queue; - -/** - * - */ -public class TransportSearchCache { - - private final Queue> cacheDfsResults = ConcurrentCollections.newQueue(); - - private final Queue> cacheQueryResults = ConcurrentCollections.newQueue(); - - private final Queue> cacheFetchResults = ConcurrentCollections.newQueue(); - - private final Queue> cacheQueryFetchResults = ConcurrentCollections.newQueue(); - - - public Collection obtainDfsResults() { - Collection dfsSearchResults; - while ((dfsSearchResults = cacheDfsResults.poll()) == null) { - Queue offer = ConcurrentCollections.newQueue(); - cacheDfsResults.offer(offer); - } - return dfsSearchResults; - } - - public void releaseDfsResults(Collection dfsResults) { - dfsResults.clear(); - cacheDfsResults.offer(dfsResults); - } - - public Map obtainQueryResults() { - Map queryResults; - while ((queryResults = cacheQueryResults.poll()) == null) { - cacheQueryResults.offer(ConcurrentCollections.newConcurrentMap()); - } - return queryResults; - } - - public void releaseQueryResults(Map queryResults) { - queryResults.clear(); - cacheQueryResults.offer(queryResults); - } - - public Map obtainFetchResults() { - Map fetchResults; - while ((fetchResults = cacheFetchResults.poll()) == null) { - cacheFetchResults.offer(ConcurrentCollections.newConcurrentMap()); - } - return fetchResults; - } - - public void releaseFetchResults(Map fetchResults) { - fetchResults.clear(); - cacheFetchResults.offer(fetchResults); - } - - public Map obtainQueryFetchResults() { - Map fetchResults; - while ((fetchResults = cacheQueryFetchResults.poll()) == null) { - cacheQueryFetchResults.offer(ConcurrentCollections.newConcurrentMap()); - } - return fetchResults; - } - - public void releaseQueryFetchResults(Map fetchResults) { - fetchResults.clear(); - cacheQueryFetchResults.offer(fetchResults); - } -} diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java index 4dae1ba95a7..8ffcfc74267 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java @@ -19,16 +19,14 @@ package org.elasticsearch.action.search.type; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -37,11 +35,8 @@ import org.elasticsearch.search.fetch.FetchSearchResultProvider; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; - import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; /** @@ -51,8 +46,8 @@ public class TransportSearchCountAction extends TransportSearchTypeAction { @Inject public TransportSearchCountAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController); + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { + super(settings, threadPool, clusterService, searchService, searchPhaseController); } @Override @@ -62,8 +57,6 @@ public class TransportSearchCountAction extends TransportSearchTypeAction { private class AsyncAction extends BaseAsyncAction { - private final Map queryFetchResults = searchCache.obtainQueryResults(); - private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); } @@ -78,21 +71,15 @@ public class TransportSearchCountAction extends TransportSearchTypeAction { searchService.sendExecuteQuery(node, request, listener); } - @Override - protected void processFirstPhaseResult(ShardRouting shard, QuerySearchResult result) { - queryFetchResults.put(result.shardTarget(), result); - } - @Override protected void moveToSecondPhase() throws Exception { // no need to sort, since we know we have no hits back - final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryFetchResults, ImmutableMap.of()); + final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, firstResults, (AtomicArray) AtomicArray.empty()); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null); + scrollId = buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); - searchCache.releaseQueryResults(queryFetchResults); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 35ef050a47d..f135bc16fcb 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -23,10 +23,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -38,12 +37,8 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; - /** * */ @@ -51,8 +46,8 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc @Inject public TransportSearchDfsQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController); + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { + super(settings, threadPool, clusterService, searchService, searchPhaseController); } @Override @@ -62,13 +57,11 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc private class AsyncAction extends BaseAsyncAction { - private final Collection dfsResults = searchCache.obtainDfsResults(); - - private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); - + private final AtomicArray queryFetchResults; private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); + queryFetchResults = new AtomicArray(firstResults.length()); } @Override @@ -81,24 +74,20 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc searchService.sendExecuteDfs(node, request, listener); } - @Override - protected void processFirstPhaseResult(ShardRouting shard, DfsSearchResult result) { - dfsResults.add(result); - } - @Override protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsResults); - final AtomicInteger counter = new AtomicInteger(dfsResults.size()); + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); int localOperations = 0; - for (final DfsSearchResult dfsResult : dfsResults) { + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { localOperations++; } else { QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(dfsResult, counter, node, querySearchRequest); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } } if (localOperations > 0) { @@ -106,18 +95,20 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (final DfsSearchResult dfsResult : dfsResults) { + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(dfsResult, counter, node, querySearchRequest); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } } } }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final DfsSearchResult dfsResult : dfsResults) { + for (final AtomicArray.Entry entry : firstResults.asList()) { + final DfsSearchResult dfsResult = entry.value; final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); @@ -125,11 +116,11 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - executeSecondPhase(dfsResult, counter, node, querySearchRequest); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } }); } else { - executeSecondPhase(dfsResult, counter, node, querySearchRequest); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } } } @@ -137,12 +128,12 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc } } - void executeSecondPhase(final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) { + void executeSecondPhase(final int shardRequestId, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) { searchService.sendExecuteFetch(node, querySearchRequest, new SearchServiceListener() { @Override public void onResult(QueryFetchSearchResult result) { result.shardTarget(dfsResult.shardTarget()); - queryFetchResults.put(result.shardTarget(), result); + queryFetchResults.set(shardRequestId, result); if (counter.decrementAndGet() == 0) { finishHim(); } @@ -153,7 +144,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -172,17 +163,16 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc } listener.onFailure(failure); } finally { - searchCache.releaseDfsResults(dfsResults); - searchCache.releaseQueryFetchResults(queryFetchResults); + // } } void innerFinishHim() throws Exception { - sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values()); + sortedShardList = searchPhaseController.sortDocs(queryFetchResults); final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), dfsResults, null); + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 27365096f64..1e1d60ff3d9 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -23,10 +23,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -39,11 +39,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** @@ -53,8 +50,8 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA @Inject public TransportSearchDfsQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController); + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { + super(settings, threadPool, clusterService, searchService, searchPhaseController); } @Override @@ -64,16 +61,15 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA private class AsyncAction extends BaseAsyncAction { - private final Collection dfsResults = searchCache.obtainDfsResults(); - - private final Map queryResults = searchCache.obtainQueryResults(); - - private final Map fetchResults = searchCache.obtainFetchResults(); - - private volatile Map docIdsToLoad; + final AtomicArray queryResults; + final AtomicArray fetchResults; + final AtomicArray docIdsToLoad; private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); + queryResults = new AtomicArray(firstResults.length()); + fetchResults = new AtomicArray(firstResults.length()); + docIdsToLoad = new AtomicArray(firstResults.length()); } @Override @@ -86,25 +82,20 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA searchService.sendExecuteDfs(node, request, listener); } - @Override - protected void processFirstPhaseResult(ShardRouting shard, DfsSearchResult result) { - dfsResults.add(result); - } - @Override protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsResults); - final AtomicInteger counter = new AtomicInteger(dfsResults.size()); - + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); int localOperations = 0; - for (final DfsSearchResult dfsResult : dfsResults) { + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { localOperations++; } else { QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(dfsResult, counter, querySearchRequest, node); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } } @@ -113,18 +104,20 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (final DfsSearchResult dfsResult : dfsResults) { + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(dfsResult, counter, querySearchRequest, node); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } } } }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final DfsSearchResult dfsResult : dfsResults) { + for (final AtomicArray.Entry entry : firstResults.asList()) { + final DfsSearchResult dfsResult = entry.value; final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); @@ -132,11 +125,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - executeQuery(dfsResult, counter, querySearchRequest, node); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } }); } else { - executeQuery(dfsResult, counter, querySearchRequest, node); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } } } @@ -144,12 +137,12 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } } - void executeQuery(final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) { + void executeQuery(final int shardRequestId, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) { searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener() { @Override public void onResult(QuerySearchResult result) { result.shardTarget(dfsResult.shardTarget()); - queryResults.put(result.shardTarget(), result); + queryResults.set(shardRequestId, result); if (counter.decrementAndGet() == 0) { executeFetchPhase(); } @@ -160,7 +153,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { executeFetchPhase(); @@ -178,24 +171,24 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } void innerExecuteFetchPhase() { - sortedShardList = searchPhaseController.sortDocs(queryResults.values()); - final Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); - this.docIdsToLoad = docIdsToLoad; + sortedShardList = searchPhaseController.sortDocs(queryResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - if (docIdsToLoad.isEmpty()) { + if (docIdsToLoad.asList().isEmpty()) { finishHim(); return; } - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size()); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); int localOperations = 0; - for (final Map.Entry entry : docIdsToLoad.entrySet()) { - DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResult queryResult = queryResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { localOperations++; } else { - FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue()); - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } @@ -204,30 +197,32 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (final Map.Entry entry : docIdsToLoad.entrySet()) { - DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResult queryResult = queryResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue()); - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } } }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Map.Entry entry : docIdsToLoad.entrySet()) { - final DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + final QuerySearchResult queryResult = queryResults.get(entry.index); + final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue()); + final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); if (localAsync) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } }); } else { - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } } @@ -235,12 +230,12 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } } - void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { + void executeFetch(final int shardRequestId, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener() { @Override public void onResult(FetchSearchResult result) { result.shardTarget(shardTarget); - fetchResults.put(result.shardTarget(), result); + fetchResults.set(shardRequestId, result); if (counter.decrementAndGet() == 0) { finishHim(); } @@ -251,7 +246,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -264,16 +259,13 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA try { innerFinishHim(); } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); if (logger.isDebugEnabled()) { logger.debug("failed to reduce search", failure); } listener.onFailure(failure); } finally { releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - searchCache.releaseDfsResults(dfsResults); - searchCache.releaseQueryResults(queryResults); - searchCache.releaseFetchResults(fetchResults); } } @@ -281,7 +273,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); String scrollId = null; if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults, null); + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java index 521c3f392ae..8c108fee35c 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.search.type; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; - import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CharsRef; import org.apache.lucene.util.UnicodeUtil; @@ -35,12 +34,12 @@ import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.ShardSearchRequest; import java.io.IOException; -import java.util.Collection; import java.util.Map; /** @@ -59,7 +58,7 @@ public abstract class TransportSearchHelper { return new InternalScrollSearchRequest(request, id); } - public static String buildScrollId(SearchType searchType, Collection searchPhaseResults, @Nullable Map attributes) throws IOException { + public static String buildScrollId(SearchType searchType, AtomicArray searchPhaseResults, @Nullable Map attributes) throws IOException { if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) { return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults, attributes); } else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) { @@ -71,10 +70,11 @@ public abstract class TransportSearchHelper { } } - public static String buildScrollId(String type, Collection searchPhaseResults, @Nullable Map attributes) throws IOException { + public static String buildScrollId(String type, AtomicArray searchPhaseResults, @Nullable Map attributes) throws IOException { StringBuilder sb = new StringBuilder().append(type).append(';'); - sb.append(searchPhaseResults.size()).append(';'); - for (SearchPhaseResult searchPhaseResult : searchPhaseResults) { + sb.append(searchPhaseResults.asList().size()).append(';'); + for (AtomicArray.Entry entry : searchPhaseResults.asList()) { + SearchPhaseResult searchPhaseResult = entry.value; sb.append(searchPhaseResult.id()).append(':').append(searchPhaseResult.shardTarget().nodeId()).append(';'); } if (attributes == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java index 32781d9f0d8..aab83aed1d0 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java @@ -20,14 +20,13 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.ReduceSearchPhaseException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -36,7 +35,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; +import java.io.IOException; import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; @@ -47,8 +46,8 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio @Inject public TransportSearchQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController); + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { + super(settings, threadPool, clusterService, searchService, searchPhaseController); } @Override @@ -58,9 +57,6 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio private class AsyncAction extends BaseAsyncAction { - private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); - - private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); } @@ -76,20 +72,26 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio } @Override - protected void processFirstPhaseResult(ShardRouting shard, QueryFetchSearchResult result) { - queryFetchResults.put(result.shardTarget(), result); + protected void moveToSecondPhase() throws Exception { + try { + innerFinishHim(); + } catch (Throwable e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + listener.onFailure(failure); + } } - @Override - protected void moveToSecondPhase() throws Exception { - sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values()); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); + private void innerFinishHim() throws IOException { + sortedShardList = searchPhaseController.sortDocs(firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null); + scrollId = buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); - searchCache.releaseQueryFetchResults(queryFetchResults); } } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 84ef653a7c9..e715a5fdd8a 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -23,10 +23,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -36,10 +36,8 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** @@ -49,8 +47,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi @Inject public TransportSearchQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController); + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { + super(settings, threadPool, clusterService, searchService, searchPhaseController); } @Override @@ -60,14 +58,13 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi private class AsyncAction extends BaseAsyncAction { - private final Map queryResults = searchCache.obtainQueryResults(); - - private final Map fetchResults = searchCache.obtainFetchResults(); - - private volatile Map docIdsToLoad; + final AtomicArray fetchResults; + final AtomicArray docIdsToLoad; private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); + fetchResults = new AtomicArray(firstResults.length()); + docIdsToLoad = new AtomicArray(firstResults.length()); } @Override @@ -80,32 +77,27 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi searchService.sendExecuteQuery(node, request, listener); } - @Override - protected void processFirstPhaseResult(ShardRouting shard, QuerySearchResult result) { - queryResults.put(result.shardTarget(), result); - } - @Override protected void moveToSecondPhase() { - sortedShardList = searchPhaseController.sortDocs(queryResults.values()); - final Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); - this.docIdsToLoad = docIdsToLoad; + sortedShardList = searchPhaseController.sortDocs(firstResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - if (docIdsToLoad.isEmpty()) { + if (docIdsToLoad.asList().isEmpty()) { finishHim(); return; } - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size()); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); int localOperations = 0; - for (final Map.Entry entry : docIdsToLoad.entrySet()) { - DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + for (AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResult queryResult = firstResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { localOperations++; } else { - FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue()); - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } @@ -114,30 +106,32 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (final Map.Entry entry : docIdsToLoad.entrySet()) { - DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + for (AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResult queryResult = firstResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue()); - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } } }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Map.Entry entry : docIdsToLoad.entrySet()) { - final DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + final QuerySearchResult queryResult = firstResults.get(entry.index); + final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue()); + final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); if (localAsync) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } }); } else { - executeFetch(entry.getKey(), counter, fetchSearchRequest, node); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } } @@ -145,12 +139,12 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi } } - void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { + void executeFetch(final int shardRequestId, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener() { @Override public void onResult(FetchSearchResult result) { result.shardTarget(shardTarget); - fetchResults.put(result.shardTarget(), result); + fetchResults.set(shardRequestId, result); if (counter.decrementAndGet() == 0) { finishHim(); } @@ -161,7 +155,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -180,17 +174,15 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi } listener.onFailure(failure); } finally { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - searchCache.releaseQueryResults(queryResults); - searchCache.releaseFetchResults(fetchResults); + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); } } void innerFinishHim() throws Exception { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); String scrollId = null; if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values(), null); + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java index a6722b9d4ad..5e408450c73 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java @@ -25,10 +25,9 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -37,19 +36,16 @@ import org.elasticsearch.search.fetch.FetchSearchResultProvider; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; - import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; public class TransportSearchScanAction extends TransportSearchTypeAction { @Inject public TransportSearchScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController); + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { + super(settings, threadPool, clusterService, searchService, searchPhaseController); } @Override @@ -59,8 +55,6 @@ public class TransportSearchScanAction extends TransportSearchTypeAction { private class AsyncAction extends BaseAsyncAction { - private final Map queryResults = searchCache.obtainQueryResults(); - private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); } @@ -75,20 +69,14 @@ public class TransportSearchScanAction extends TransportSearchTypeAction { searchService.sendExecuteScan(node, request, listener); } - @Override - protected void processFirstPhaseResult(ShardRouting shard, QuerySearchResult result) { - queryResults.put(result.shardTarget(), result); - } - @Override protected void moveToSecondPhase() throws Exception { - final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryResults, ImmutableMap.of()); + final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, firstResults, (AtomicArray) AtomicArray.empty()); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), queryResults.values(), ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits()))); + scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits()))); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); - searchCache.releaseQueryResults(queryResults); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 3a331fe4f29..34d7395fa57 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -28,8 +28,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -38,8 +37,7 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; -import java.util.Queue; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; @@ -57,16 +55,12 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent private final SearchPhaseController searchPhaseController; - private final TransportSearchCache searchCache; - @Inject public TransportSearchScrollQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache searchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; - this.searchCache = searchCache; this.searchService = searchService; this.searchPhaseController = searchPhaseController; } @@ -85,12 +79,10 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent private final DiscoveryNodes nodes; - private volatile Queue shardFailures; - - private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); + private volatile AtomicArray shardFailures; + private final AtomicArray queryFetchResults; private final AtomicInteger successfulOps; - private final AtomicInteger counter; private final long startTime = System.currentTimeMillis(); @@ -102,23 +94,29 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent this.nodes = clusterService.state().nodes(); this.successfulOps = new AtomicInteger(scrollId.getContext().length); this.counter = new AtomicInteger(scrollId.getContext().length); + + this.queryFetchResults = new AtomicArray(scrollId.getContext().length); } protected final ShardSearchFailure[] buildShardFailures() { - Queue localFailures = shardFailures; - if (localFailures == null) { + if (shardFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } - return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; } // we do our best to return the shard failures, but its ok if its not fully concurrently safe // we simply try and return as much as possible - protected final void addShardFailure(ShardSearchFailure failure) { + protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) { if (shardFailures == null) { - shardFailures = ConcurrentCollections.newQueue(); + shardFailures = new AtomicArray(scrollId.getContext().length); } - shardFailures.add(failure); + shardFailures.set(shardRequestId, failure); } public void start() { @@ -128,13 +126,15 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } int localOperations = 0; - for (Tuple target : scrollId.getContext()) { + Tuple[] context = scrollId.getContext(); + for (int i = 0; i < context.length; i++) { + Tuple target = context[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null) { if (nodes.localNodeId().equals(node.id())) { localOperations++; } else { - executePhase(node, target.v2()); + executePhase(i, node, target.v2()); } } else { if (logger.isDebugEnabled()) { @@ -152,28 +152,33 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (Tuple target : scrollId.getContext()) { + Tuple[] context1 = scrollId.getContext(); + for (int i = 0; i < context1.length; i++) { + Tuple target = context1[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(node, target.v2()); + executePhase(i, node, target.v2()); } } } }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Tuple target : scrollId.getContext()) { + Tuple[] context1 = scrollId.getContext(); + for (int i = 0; i < context1.length; i++) { + final Tuple target = context1[i]; + final int shardRequestId = i; final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - executePhase(node, target.v2()); + executePhase(shardRequestId, node, target.v2()); } }); } else { - executePhase(node, target.v2()); + executePhase(shardRequestId, node, target.v2()); } } } @@ -195,11 +200,11 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } } - private void executePhase(DiscoveryNode node, final long searchId) { + private void executePhase(final int shardRequestId, DiscoveryNode node, final long searchId) { searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener() { @Override public void onResult(QueryFetchSearchResult result) { - queryFetchResults.put(result.shardTarget(), result); + queryFetchResults.set(shardRequestId, result); if (counter.decrementAndGet() == 0) { finishHim(); } @@ -210,7 +215,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - addShardFailure(new ShardSearchFailure(t)); + addShardFailure(shardRequestId, new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -228,13 +233,12 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } private void innerFinishHim() { - ShardDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values()); + ShardDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults); final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); } - searchCache.releaseQueryFetchResults(queryFetchResults); listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), System.currentTimeMillis() - startTime, buildShardFailures())); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 145f9802f68..b8c6d5a8ca0 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -29,8 +29,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -39,11 +38,9 @@ import org.elasticsearch.search.fetch.FetchSearchRequest; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; -import java.util.Queue; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; @@ -61,16 +58,12 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent private final SearchPhaseController searchPhaseController; - private final TransportSearchCache searchCache; - @Inject public TransportSearchScrollQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache searchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; - this.searchCache = searchCache; this.searchService = searchService; this.searchPhaseController = searchPhaseController; } @@ -89,11 +82,9 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent private final DiscoveryNodes nodes; - protected volatile Queue shardFailures; - - private final Map queryResults = searchCache.obtainQueryResults(); - - private final Map fetchResults = searchCache.obtainFetchResults(); + private volatile AtomicArray shardFailures; + final AtomicArray queryResults; + final AtomicArray fetchResults; private volatile ShardDoc[] sortedShardList; @@ -107,23 +98,30 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent this.scrollId = scrollId; this.nodes = clusterService.state().nodes(); this.successfulOps = new AtomicInteger(scrollId.getContext().length); + + this.queryResults = new AtomicArray(scrollId.getContext().length); + this.fetchResults = new AtomicArray(scrollId.getContext().length); } protected final ShardSearchFailure[] buildShardFailures() { - Queue localFailures = shardFailures; - if (localFailures == null) { + if (shardFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } - return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; } // we do our best to return the shard failures, but its ok if its not fully concurrently safe // we simply try and return as much as possible - protected final void addShardFailure(ShardSearchFailure failure) { + protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) { if (shardFailures == null) { - shardFailures = ConcurrentCollections.newQueue(); + shardFailures = new AtomicArray(scrollId.getContext().length); } - shardFailures.add(failure); + shardFailures.set(shardRequestId, failure); } public void start() { @@ -134,13 +132,15 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); int localOperations = 0; - for (Tuple target : scrollId.getContext()) { + Tuple[] context = scrollId.getContext(); + for (int i = 0; i < context.length; i++) { + Tuple target = context[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null) { if (nodes.localNodeId().equals(node.id())) { localOperations++; } else { - executeQueryPhase(counter, node, target.v2()); + executeQueryPhase(i, counter, node, target.v2()); } } else { if (logger.isDebugEnabled()) { @@ -158,28 +158,33 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (Tuple target : scrollId.getContext()) { + Tuple[] context1 = scrollId.getContext(); + for (int i = 0; i < context1.length; i++) { + Tuple target = context1[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { - executeQueryPhase(counter, node, target.v2()); + executeQueryPhase(i, counter, node, target.v2()); } } } }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Tuple target : scrollId.getContext()) { + Tuple[] context1 = scrollId.getContext(); + for (int i = 0; i < context1.length; i++) { + final Tuple target = context1[i]; + final int shardRequestId = i; final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - executeQueryPhase(counter, node, target.v2()); + executeQueryPhase(shardRequestId, counter, node, target.v2()); } }); } else { - executeQueryPhase(counter, node, target.v2()); + executeQueryPhase(shardRequestId, counter, node, target.v2()); } } } @@ -187,11 +192,11 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } } - private void executeQueryPhase(final AtomicInteger counter, DiscoveryNode node, final long searchId) { + private void executeQueryPhase(final int shardRequestId, final AtomicInteger counter, DiscoveryNode node, final long searchId) { searchService.sendExecuteQuery(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener() { @Override public void onResult(QuerySearchResult result) { - queryResults.put(result.shardTarget(), result); + queryResults.set(shardRequestId, result); if (counter.decrementAndGet() == 0) { executeFetchPhase(); } @@ -202,7 +207,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - addShardFailure(new ShardSearchFailure(t)); + addShardFailure(shardRequestId, new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { executeFetchPhase(); @@ -212,25 +217,26 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } private void executeFetchPhase() { - sortedShardList = searchPhaseController.sortDocs(queryResults.values()); - Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); + sortedShardList = searchPhaseController.sortDocs(queryResults); + AtomicArray docIdsToLoad = new AtomicArray(queryResults.length()); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - if (docIdsToLoad.isEmpty()) { + if (docIdsToLoad.asList().isEmpty()) { finishHim(); } - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size()); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final Map.Entry entry : docIdsToLoad.entrySet()) { - SearchShardTarget shardTarget = entry.getKey(); - ExtTIntArrayList docIds = entry.getValue(); - FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(shardTarget).id(), docIds); - DiscoveryNode node = nodes.get(shardTarget.nodeId()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + ExtTIntArrayList docIds = entry.value; + final QuerySearchResult querySearchResult = queryResults.get(entry.index); + FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds); + DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener() { @Override public void onResult(FetchSearchResult result) { - result.shardTarget(entry.getKey()); - fetchResults.put(result.shardTarget(), result); + result.shardTarget(querySearchResult.shardTarget()); + fetchResults.set(entry.index, result); if (counter.decrementAndGet() == 0) { finishHim(); } @@ -266,8 +272,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), System.currentTimeMillis() - startTime, buildShardFailures())); - searchCache.releaseQueryResults(queryResults); - searchCache.releaseFetchResults(fetchResults); } } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index 41c6029be39..2e710e04f40 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -29,8 +29,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; @@ -42,8 +41,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Map; -import java.util.Queue; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; @@ -61,16 +59,12 @@ public class TransportSearchScrollScanAction extends AbstractComponent { private final SearchPhaseController searchPhaseController; - private final TransportSearchCache searchCache; - @Inject public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportSearchCache searchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; - this.searchCache = searchCache; this.searchService = searchService; this.searchPhaseController = searchPhaseController; } @@ -89,14 +83,11 @@ public class TransportSearchScrollScanAction extends AbstractComponent { private final DiscoveryNodes nodes; - protected volatile Queue shardFailures; - - private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); + private volatile AtomicArray shardFailures; + private final AtomicArray queryFetchResults; private final AtomicInteger successfulOps; - private final AtomicInteger counter; - private final long startTime = System.currentTimeMillis(); private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { @@ -106,41 +97,48 @@ public class TransportSearchScrollScanAction extends AbstractComponent { this.nodes = clusterService.state().nodes(); this.successfulOps = new AtomicInteger(scrollId.getContext().length); this.counter = new AtomicInteger(scrollId.getContext().length); + + this.queryFetchResults = new AtomicArray(scrollId.getContext().length); } protected final ShardSearchFailure[] buildShardFailures() { - Queue localFailures = shardFailures; - if (localFailures == null) { + if (shardFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } - return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; } // we do our best to return the shard failures, but its ok if its not fully concurrently safe // we simply try and return as much as possible - protected final void addShardFailure(ShardSearchFailure failure) { + protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) { if (shardFailures == null) { - shardFailures = ConcurrentCollections.newQueue(); + shardFailures = new AtomicArray(scrollId.getContext().length); } - shardFailures.add(failure); + shardFailures.set(shardRequestId, failure); } public void start() { if (scrollId.getContext().length == 0) { final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.getAttributes().get("total_hits")), 0.0f), null, null, false); - searchCache.releaseQueryFetchResults(queryFetchResults); listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures())); return; } int localOperations = 0; - for (Tuple target : scrollId.getContext()) { + Tuple[] context = scrollId.getContext(); + for (int i = 0; i < context.length; i++) { + Tuple target = context[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null) { if (nodes.localNodeId().equals(node.id())) { localOperations++; } else { - executePhase(node, target.v2()); + executePhase(i, node, target.v2()); } } else { if (logger.isDebugEnabled()) { @@ -158,28 +156,33 @@ public class TransportSearchScrollScanAction extends AbstractComponent { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (Tuple target : scrollId.getContext()) { + Tuple[] context1 = scrollId.getContext(); + for (int i = 0; i < context1.length; i++) { + Tuple target = context1[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(node, target.v2()); + executePhase(i, node, target.v2()); } } } }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Tuple target : scrollId.getContext()) { + Tuple[] context1 = scrollId.getContext(); + for (int i = 0; i < context1.length; i++) { + final Tuple target = context1[i]; + final int shardRequestId = i; final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - executePhase(node, target.v2()); + executePhase(shardRequestId, node, target.v2()); } }); } else { - executePhase(node, target.v2()); + executePhase(shardRequestId, node, target.v2()); } } } @@ -201,11 +204,11 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } } - private void executePhase(DiscoveryNode node, final long searchId) { + private void executePhase(final int shardRequestId, DiscoveryNode node, final long searchId) { searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener() { @Override public void onResult(QueryFetchSearchResult result) { - queryFetchResults.put(result.shardTarget(), result); + queryFetchResults.set(shardRequestId, result); if (counter.decrementAndGet() == 0) { finishHim(); } @@ -216,7 +219,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - addShardFailure(new ShardSearchFailure(t)); + addShardFailure(shardRequestId, new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -234,39 +237,37 @@ public class TransportSearchScrollScanAction extends AbstractComponent { logger.debug("failed to reduce search", failure); } listener.onFailure(failure); - } finally { - searchCache.releaseQueryFetchResults(queryFetchResults); } } private void innerFinishHim() throws IOException { int numberOfHits = 0; - for (QueryFetchSearchResult shardResult : queryFetchResults.values()) { - numberOfHits += shardResult.queryResult().topDocs().scoreDocs.length; + for (AtomicArray.Entry entry : queryFetchResults.asList()) { + numberOfHits += entry.value.queryResult().topDocs().scoreDocs.length; } ShardDoc[] docs = new ShardDoc[numberOfHits]; int counter = 0; - for (QueryFetchSearchResult shardResult : queryFetchResults.values()) { - ScoreDoc[] scoreDocs = shardResult.queryResult().topDocs().scoreDocs; + for (AtomicArray.Entry entry : queryFetchResults.asList()) { + ScoreDoc[] scoreDocs = entry.value.queryResult().topDocs().scoreDocs; for (ScoreDoc scoreDoc : scoreDocs) { - docs[counter++] = new ShardScoreDoc(shardResult.shardTarget(), scoreDoc.doc, 0.0f); + docs[counter++] = new ShardScoreDoc(entry.index, scoreDoc.doc, 0.0f); } } final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults); ((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.getAttributes().get("total_hits")); - for (QueryFetchSearchResult shardResult : queryFetchResults.values()) { - if (shardResult.queryResult().topDocs().scoreDocs.length < shardResult.queryResult().size()) { + for (AtomicArray.Entry entry : queryFetchResults.asList()) { + if (entry.value.queryResult().topDocs().scoreDocs.length < entry.value.queryResult().size()) { // we found more than we want for this round, remove this from our scrolling - queryFetchResults.remove(shardResult.shardTarget()); + queryFetchResults.set(entry.index, null); } } String scrollId = null; if (request.scroll() != null) { // we rebuild the scroll id since we remove shards that we finished scrolling on - scrollId = TransportSearchHelper.buildScrollId(this.scrollId.getType(), queryFetchResults.values(), this.scrollId.getAttributes()); // continue moving the total_hits + scrollId = TransportSearchHelper.buildScrollId(this.scrollId.getType(), queryFetchResults, this.scrollId.getAttributes()); // continue moving the total_hits } listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), System.currentTimeMillis() - startTime, buildShardFailures())); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 2fadc3b9cf4..53dd3c9b638 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -33,7 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -46,8 +46,8 @@ import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; +import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -64,13 +64,10 @@ public abstract class TransportSearchTypeAction extends TransportAction listener; - private final GroupShardsIterator shardsIts; + protected final GroupShardsIterator shardsIts; protected final SearchRequest request; @@ -87,15 +84,13 @@ public abstract class TransportSearchTypeAction extends TransportAction shardFailures; - + protected final AtomicArray firstResults; + private volatile AtomicArray shardFailures; protected volatile ShardDoc[] sortedShardList; protected final long startTime = System.currentTimeMillis(); @@ -126,24 +121,28 @@ public abstract class TransportSearchTypeAction extends TransportAction(shardsIts.size()); } public void start() { request.beforeStart(); // count the local operations, and perform the non local ones int localOperations = 0; + int shardRequestId = -1; for (final ShardIterator shardIt : shardsIts) { + shardRequestId++; final ShardRouting shard = shardIt.firstOrNull(); if (shard != null) { if (shard.currentNodeId().equals(nodes.localNodeId())) { localOperations++; } else { // do the remote operation here, the localAsync flag is not relevant - performFirstPhase(shardIt); + performFirstPhase(shardRequestId, shardIt); } } else { // really, no shards active in this group - onFirstPhaseResult(null, shardIt, null); + onFirstPhaseResult(shardRequestId, null, shardIt, null); } } // we have local operations, perform them now @@ -153,11 +152,13 @@ public abstract class TransportSearchTypeAction extends TransportAction() { @Override public void onResult(FirstResult result) { - onFirstPhaseResult(shard, result, shardIt); + onFirstPhaseResult(shardRequestId, shard, result, shardIt); } @Override public void onFailure(Throwable t) { - onFirstPhaseResult(shard, shardIt, t); + onFirstPhaseResult(shardRequestId, shard, shardIt, t); } }); } } } - void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardIterator shardIt) { + void onFirstPhaseResult(int shardRequestId, ShardRouting shard, FirstResult result, ShardIterator shardIt) { result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id())); - processFirstPhaseResult(shard, result); + processFirstPhaseResult(shardRequestId, shard, result); // increment all the "future" shards to update the total ops since we some may work and some may not... // and when that happens, we break on total ops, so we must maintain them int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); successulOps.incrementAndGet(); if (xTotalOps == expectedTotalOps) { try { - moveToSecondPhase(); + innerMoveToSecondPhase(); } catch (Throwable e) { if (logger.isDebugEnabled()) { logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e); @@ -237,7 +241,7 @@ public abstract class TransportSearchTypeAction extends TransportAction localFailures = shardFailures; - if (localFailures == null) { + if (shardFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } - return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; } // we do our best to return the shard failures, but its ok if its not fully concurrently safe // we simply try and return as much as possible - protected final void addShardFailure(ShardSearchFailure failure) { + protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) { if (shardFailures == null) { - shardFailures = ConcurrentCollections.newQueue(); + shardFailures = new AtomicArray(shardsIts.size()); } - shardFailures.add(failure); + shardFailures.set(shardRequestId, failure); } /** * Releases shard targets that are not used in the docsIdsToLoad. */ - protected void releaseIrrelevantSearchContexts(Map queryResults, - Map docIdsToLoad) { + protected void releaseIrrelevantSearchContexts(AtomicArray queryResults, + AtomicArray docIdsToLoad) { if (docIdsToLoad == null) { return; } // we only release search context that we did not fetch from if we are not scrolling if (request.scroll() == null) { - for (Map.Entry entry : queryResults.entrySet()) { - if (!docIdsToLoad.containsKey(entry.getKey())) { - DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + for (AtomicArray.Entry entry : queryResults.asList()) { + if (docIdsToLoad.get(entry.index) == null) { + DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); if (node != null) { // should not happen (==null) but safeguard anyhow - searchService.sendFreeContext(node, entry.getValue().id(), request); + searchService.sendFreeContext(node, entry.value.queryResult().id(), request); } } } @@ -349,7 +357,13 @@ public abstract class TransportSearchTypeAction extends TransportAction listener); - protected abstract void processFirstPhaseResult(ShardRouting shard, FirstResult result); + protected final void processFirstPhaseResult(int shardRequestId, ShardRouting shard, FirstResult result) { + firstResults.set(shardRequestId, result); + } + + final void innerMoveToSecondPhase() throws Exception { + moveToSecondPhase(); + } protected abstract void moveToSecondPhase() throws Exception; diff --git a/src/main/java/org/elasticsearch/common/lucene/search/ShardFieldDocSortedHitQueue.java b/src/main/java/org/elasticsearch/common/lucene/search/ShardFieldDocSortedHitQueue.java index 81d26416c8c..2859a18a8c5 100644 --- a/src/main/java/org/elasticsearch/common/lucene/search/ShardFieldDocSortedHitQueue.java +++ b/src/main/java/org/elasticsearch/common/lucene/search/ShardFieldDocSortedHitQueue.java @@ -126,7 +126,7 @@ public class ShardFieldDocSortedHitQueue extends PriorityQueue { // avoid random sort order that could lead to duplicates (bug #31241): if (c == 0) { // CHANGE: Add shard base tie breaking - c = docA.shardTarget().compareTo(docB.shardTarget()); + c = docA.shardRequestId() - docB.shardRequestId(); if (c == 0) { return docA.doc > docB.doc; } diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java new file mode 100644 index 00000000000..1438d93927d --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java @@ -0,0 +1,119 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * A list backed by an {@link AtomicReferenceArray} with potential null values, easily allowing + * to get the concrete values as a list using {@link #asList()}. + */ +public class AtomicArray { + + private static final AtomicArray EMPTY = new AtomicArray(0); + + @SuppressWarnings("unchecked") + public static E empty() { + return (E) EMPTY; + } + + private final AtomicReferenceArray array; + private volatile List> nonNullList; + + public AtomicArray(int size) { + array = new AtomicReferenceArray(size); + } + + /** + * The size of the expected results, including potential null values. + */ + public int length() { + return array.length(); + } + + + /** + * Sets the element at position {@code i} to the given value. + * + * @param i the index + * @param value the new value + */ + public void set(int i, E value) { + array.set(i, value); + if (nonNullList != null) { // read first, lighter, and most times it will be null... + nonNullList = null; + } + } + + /** + * Gets the current value at position {@code i}. + * + * @param i the index + * @return the current value + */ + public E get(int i) { + return array.get(i); + } + + /** + * Returns the it as a non null list, with an Entry wrapping each value allowing to + * retain its index. + */ + public List> asList() { + if (nonNullList == null) { + if (array == null || array.length() == 0) { + nonNullList = ImmutableList.of(); + } else { + List> list = new ArrayList>(array.length()); + for (int i = 0; i < array.length(); i++) { + E e = array.get(i); + if (e != null) { + list.add(new Entry(i, e)); + } + } + nonNullList = list; + } + } + return nonNullList; + } + + /** + * An entry within the array. + */ + public static class Entry { + /** + * The original index of the value within the array. + */ + public final int index; + /** + * The value. + */ + public final E value; + + public Entry(int index, E value) { + this.index = index; + this.value = value; + } + } +} diff --git a/src/main/java/org/elasticsearch/search/controller/ScoreDocQueue.java b/src/main/java/org/elasticsearch/search/controller/ScoreDocQueue.java index 881ef6a6cd3..c9a3fb58d66 100644 --- a/src/main/java/org/elasticsearch/search/controller/ScoreDocQueue.java +++ b/src/main/java/org/elasticsearch/search/controller/ScoreDocQueue.java @@ -23,8 +23,6 @@ import org.apache.lucene.util.PriorityQueue; /** *

Same as lucene {@link org.apache.lucene.search.HitQueue}. - * - * */ public class ScoreDocQueue extends PriorityQueue { @@ -34,7 +32,7 @@ public class ScoreDocQueue extends PriorityQueue { protected final boolean lessThan(ShardScoreDoc hitA, ShardScoreDoc hitB) { if (hitA.score == hitB.score) { - int c = hitA.shardTarget().compareTo(hitB.shardTarget()); + int c = hitA.shardRequestId() - hitB.shardRequestId(); if (c == 0) { return hitA.doc > hitB.doc; } diff --git a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index 392c1a26b85..8547106f203 100644 --- a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -19,7 +19,6 @@ package org.elasticsearch.search.controller; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import org.apache.lucene.index.Term; @@ -33,7 +32,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.search.ShardFieldDocSortedHitQueue; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; -import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.facet.Facet; @@ -49,7 +48,6 @@ import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.suggest.Suggest; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,12 +57,12 @@ import java.util.Map; */ public class SearchPhaseController extends AbstractComponent { - public static Ordering QUERY_RESULT_ORDERING = new Ordering() { + public static Ordering> QUERY_RESULT_ORDERING = new Ordering>() { @Override - public int compare(@Nullable QuerySearchResultProvider o1, @Nullable QuerySearchResultProvider o2) { - int i = o1.shardTarget().index().compareTo(o2.shardTarget().index()); + public int compare(@Nullable AtomicArray.Entry o1, @Nullable AtomicArray.Entry o2) { + int i = o1.value.shardTarget().index().compareTo(o2.value.shardTarget().index()); if (i == 0) { - i = o1.shardTarget().shardId() - o2.shardTarget().shardId(); + i = o1.value.shardTarget().shardId() - o2.value.shardTarget().shardId(); } return i; } @@ -86,13 +84,13 @@ public class SearchPhaseController extends AbstractComponent { return optimizeSingleShard; } - public AggregatedDfs aggregateDfs(Iterable results) { + public AggregatedDfs aggregateDfs(AtomicArray results) { Map termStatistics = XMaps.newNoNullKeysMap(); Map fieldStatistics = XMaps.newNoNullKeysMap(); long aggMaxDoc = 0; - for (DfsSearchResult result : results) { - final Term[] terms = result.terms(); - final TermStatistics[] stats = result.termStatistics(); + for (AtomicArray.Entry lEntry : results.asList()) { + final Term[] terms = lEntry.value.terms(); + final TermStatistics[] stats = lEntry.value.termStatistics(); assert terms.length == stats.length; for (int i = 0; i < terms.length; i++) { assert terms[i] != null; @@ -102,14 +100,14 @@ public class SearchPhaseController extends AbstractComponent { // totalTermFrequency is an optional statistic we need to check if either one or both // are set to -1 which means not present and then set it globally to -1 termStatistics.put(terms[i], new TermStatistics(existing.term(), - existing.docFreq() + stats[i].docFreq(), - optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq()))); + existing.docFreq() + stats[i].docFreq(), + optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq()))); } else { termStatistics.put(terms[i], stats[i]); } } - for (Map.Entry entry : result.fieldStatistics().entrySet()) { + for (Map.Entry entry : lEntry.value.fieldStatistics().entrySet()) { assert entry.getKey() != null; CollectionStatistics existing = fieldStatistics.get(entry.getKey()); if (existing != null) { @@ -124,36 +122,39 @@ public class SearchPhaseController extends AbstractComponent { fieldStatistics.put(entry.getKey(), entry.getValue()); } } - aggMaxDoc += result.maxDoc(); + aggMaxDoc += lEntry.value.maxDoc(); } return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc); } - + private static long optionalSum(long left, long right) { - return Math.min(left, right) == -1 ? -1 : left + right; + return Math.min(left, right) == -1 ? -1 : left + right; } - public ShardDoc[] sortDocs(Collection results1) { - if (results1.isEmpty()) { + public ShardDoc[] sortDocs(AtomicArray results1) { + if (results1.asList().isEmpty()) { return EMPTY; } if (optimizeSingleShard) { boolean canOptimize = false; QuerySearchResult result = null; - if (results1.size() == 1) { + int shardRequestId = -1; + if (results1.asList().size() == 1) { canOptimize = true; - result = results1.iterator().next().queryResult(); + result = results1.asList().get(0).value.queryResult(); + shardRequestId = results1.asList().get(0).index; } else { // lets see if we only got hits from a single shard, if so, we can optimize... - for (QuerySearchResultProvider queryResult : results1) { - if (queryResult.queryResult().topDocs().scoreDocs.length > 0) { + for (AtomicArray.Entry entry : results1.asList()) { + if (entry.value.queryResult().topDocs().scoreDocs.length > 0) { if (result != null) { // we already have one, can't really optimize canOptimize = false; break; } canOptimize = true; - result = queryResult.queryResult(); + result = entry.value.queryResult(); + shardRequestId = entry.index; } } } @@ -170,23 +171,22 @@ public class SearchPhaseController extends AbstractComponent { ShardDoc[] docs = new ShardDoc[resultDocsSize]; for (int i = 0; i < resultDocsSize; i++) { ScoreDoc scoreDoc = scoreDocs[result.from() + i]; - docs[i] = new ShardFieldDoc(result.shardTarget(), scoreDoc.doc, scoreDoc.score, ((FieldDoc) scoreDoc).fields); + docs[i] = new ShardFieldDoc(shardRequestId, scoreDoc.doc, scoreDoc.score, ((FieldDoc) scoreDoc).fields); } return docs; } else { ShardDoc[] docs = new ShardDoc[resultDocsSize]; for (int i = 0; i < resultDocsSize; i++) { ScoreDoc scoreDoc = scoreDocs[result.from() + i]; - docs[i] = new ShardScoreDoc(result.shardTarget(), scoreDoc.doc, scoreDoc.score); + docs[i] = new ShardScoreDoc(shardRequestId, scoreDoc.doc, scoreDoc.score); } return docs; } } } - List results = QUERY_RESULT_ORDERING.sortedCopy(results1); - - QuerySearchResultProvider queryResultProvider = results.get(0); + List> results = QUERY_RESULT_ORDERING.sortedCopy(results1.asList()); + QuerySearchResultProvider queryResultProvider = results.get(0).value; int totalNumDocs = 0; @@ -203,8 +203,8 @@ public class SearchPhaseController extends AbstractComponent { for (int i = 0; i < fieldDocs.fields.length; i++) { boolean allValuesAreNull = true; boolean resolvedField = false; - for (QuerySearchResultProvider resultProvider : results) { - for (ScoreDoc doc : resultProvider.queryResult().topDocs().scoreDocs) { + for (AtomicArray.Entry entry : results) { + for (ScoreDoc doc : entry.value.queryResult().topDocs().scoreDocs) { FieldDoc fDoc = (FieldDoc) doc; if (fDoc.fields[i] != null) { allValuesAreNull = false; @@ -227,12 +227,12 @@ public class SearchPhaseController extends AbstractComponent { queue = new ShardFieldDocSortedHitQueue(fieldDocs.fields, queueSize); // we need to accumulate for all and then filter the from - for (QuerySearchResultProvider resultProvider : results) { - QuerySearchResult result = resultProvider.queryResult(); + for (AtomicArray.Entry entry : results) { + QuerySearchResult result = entry.value.queryResult(); ScoreDoc[] scoreDocs = result.topDocs().scoreDocs; totalNumDocs += scoreDocs.length; for (ScoreDoc doc : scoreDocs) { - ShardFieldDoc nodeFieldDoc = new ShardFieldDoc(result.shardTarget(), doc.doc, doc.score, ((FieldDoc) doc).fields); + ShardFieldDoc nodeFieldDoc = new ShardFieldDoc(entry.index, doc.doc, doc.score, ((FieldDoc) doc).fields); if (queue.insertWithOverflow(nodeFieldDoc) == nodeFieldDoc) { // filled the queue, break break; @@ -241,12 +241,12 @@ public class SearchPhaseController extends AbstractComponent { } } else { queue = new ScoreDocQueue(queueSize); // we need to accumulate for all and then filter the from - for (QuerySearchResultProvider resultProvider : results) { - QuerySearchResult result = resultProvider.queryResult(); + for (AtomicArray.Entry entry : results) { + QuerySearchResult result = entry.value.queryResult(); ScoreDoc[] scoreDocs = result.topDocs().scoreDocs; totalNumDocs += scoreDocs.length; for (ScoreDoc doc : scoreDocs) { - ShardScoreDoc nodeScoreDoc = new ShardScoreDoc(result.shardTarget(), doc.doc, doc.score); + ShardScoreDoc nodeScoreDoc = new ShardScoreDoc(entry.index, doc.doc, doc.score); if (queue.insertWithOverflow(nodeScoreDoc) == nodeScoreDoc) { // filled the queue, break break; @@ -277,31 +277,31 @@ public class SearchPhaseController extends AbstractComponent { return shardDocs; } - public Map docIdsToLoad(ShardDoc[] shardDocs) { - Map result = XMaps.newMap(); + /** + * Builds an array, with potential null elements, with docs to load. + */ + public void fillDocIdsToLoad(AtomicArray docsIdsToLoad, ShardDoc[] shardDocs) { for (ShardDoc shardDoc : shardDocs) { - ExtTIntArrayList list = result.get(shardDoc.shardTarget()); + ExtTIntArrayList list = docsIdsToLoad.get(shardDoc.shardRequestId()); if (list == null) { list = new ExtTIntArrayList(); // can't be shared!, uses unsafe on it later on - result.put(shardDoc.shardTarget(), list); + docsIdsToLoad.set(shardDoc.shardRequestId(), list); } list.add(shardDoc.docId()); } - return result; } - public InternalSearchResponse merge(ShardDoc[] sortedDocs, Map queryResults, Map fetchResults) { + public InternalSearchResponse merge(ShardDoc[] sortedDocs, AtomicArray queryResults, AtomicArray fetchResults) { boolean sorted = false; int sortScoreIndex = -1; - QuerySearchResult querySearchResult; - try { - querySearchResult = Iterables.get(queryResults.values(), 0).queryResult(); - } catch (IndexOutOfBoundsException e) { - // no results, return an empty response + + if (queryResults.asList().isEmpty()) { return InternalSearchResponse.EMPTY; } + QuerySearchResult querySearchResult = queryResults.asList().get(0).value.queryResult(); + if (querySearchResult.topDocs() instanceof TopFieldDocs) { sorted = true; TopFieldDocs fieldDocs = (TopFieldDocs) querySearchResult.queryResult().topDocs(); @@ -314,7 +314,7 @@ public class SearchPhaseController extends AbstractComponent { // merge facets InternalFacets facets = null; - if (!queryResults.isEmpty()) { + if (!queryResults.asList().isEmpty()) { // we rely on the fact that the order of facets is the same on all query results if (querySearchResult.facets() != null && querySearchResult.facets().facets() != null && !querySearchResult.facets().facets().isEmpty()) { List aggregatedFacets = Lists.newArrayList(); @@ -322,8 +322,8 @@ public class SearchPhaseController extends AbstractComponent { for (Facet facet : querySearchResult.facets()) { // aggregate each facet name into a single list, and aggregate it namedFacets.clear(); - for (QuerySearchResultProvider queryResultProvider : queryResults.values()) { - for (Facet facet1 : queryResultProvider.queryResult().facets()) { + for (AtomicArray.Entry entry : queryResults.asList()) { + for (Facet facet1 : entry.value.queryResult().facets()) { if (facet.getName().equals(facet1.getName())) { namedFacets.add(facet1); } @@ -342,13 +342,14 @@ public class SearchPhaseController extends AbstractComponent { long totalHits = 0; float maxScore = Float.NEGATIVE_INFINITY; boolean timedOut = false; - for (QuerySearchResultProvider queryResultProvider : queryResults.values()) { - if (queryResultProvider.queryResult().searchTimedOut()) { + for (AtomicArray.Entry entry : queryResults.asList()) { + QuerySearchResult result = entry.value.queryResult(); + if (result.searchTimedOut()) { timedOut = true; } - totalHits += queryResultProvider.queryResult().topDocs().totalHits; - if (!Float.isNaN(queryResultProvider.queryResult().topDocs().getMaxScore())) { - maxScore = Math.max(maxScore, queryResultProvider.queryResult().topDocs().getMaxScore()); + totalHits += result.topDocs().totalHits; + if (!Float.isNaN(result.topDocs().getMaxScore())) { + maxScore = Math.max(maxScore, result.topDocs().getMaxScore()); } } if (Float.isInfinite(maxScore)) { @@ -356,15 +357,15 @@ public class SearchPhaseController extends AbstractComponent { } // clean the fetch counter - for (FetchSearchResultProvider fetchSearchResultProvider : fetchResults.values()) { - fetchSearchResultProvider.fetchResult().initCounter(); + for (AtomicArray.Entry entry : fetchResults.asList()) { + entry.value.fetchResult().initCounter(); } // merge hits List hits = new ArrayList(); - if (!fetchResults.isEmpty()) { + if (!fetchResults.asList().isEmpty()) { for (ShardDoc shardDoc : sortedDocs) { - FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardTarget()); + FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardRequestId()); if (fetchResultProvider == null) { continue; } @@ -390,13 +391,12 @@ public class SearchPhaseController extends AbstractComponent { // merge suggest results Suggest suggest = null; - if (!queryResults.isEmpty()) { - + if (!queryResults.asList().isEmpty()) { final Map> groupedSuggestions = new HashMap>(); boolean hasSuggestions = false; - for (QuerySearchResultProvider resultProvider : queryResults.values()) { - Suggest shardResult = resultProvider.queryResult().suggest(); - + for (AtomicArray.Entry entry : queryResults.asList()) { + Suggest shardResult = entry.value.queryResult().queryResult().suggest(); + if (shardResult == null) { continue; } @@ -410,5 +410,5 @@ public class SearchPhaseController extends AbstractComponent { InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore); return new InternalSearchResponse(searchHits, facets, suggest, timedOut); } - + } diff --git a/src/main/java/org/elasticsearch/search/controller/ShardDoc.java b/src/main/java/org/elasticsearch/search/controller/ShardDoc.java index c4265e809fd..d1269385bed 100644 --- a/src/main/java/org/elasticsearch/search/controller/ShardDoc.java +++ b/src/main/java/org/elasticsearch/search/controller/ShardDoc.java @@ -26,7 +26,7 @@ import org.elasticsearch.search.SearchShardTarget; */ public interface ShardDoc { - SearchShardTarget shardTarget(); + int shardRequestId(); int docId(); diff --git a/src/main/java/org/elasticsearch/search/controller/ShardFieldDoc.java b/src/main/java/org/elasticsearch/search/controller/ShardFieldDoc.java index a3c7d20df76..59454d1f0e6 100644 --- a/src/main/java/org/elasticsearch/search/controller/ShardFieldDoc.java +++ b/src/main/java/org/elasticsearch/search/controller/ShardFieldDoc.java @@ -27,21 +27,21 @@ import org.elasticsearch.search.SearchShardTarget; */ public class ShardFieldDoc extends FieldDoc implements ShardDoc { - private final SearchShardTarget shardTarget; + private final int shardRequestId; - public ShardFieldDoc(SearchShardTarget shardTarget, int doc, float score) { + public ShardFieldDoc(int shardRequestId, int doc, float score) { super(doc, score); - this.shardTarget = shardTarget; + this.shardRequestId = shardRequestId; } - public ShardFieldDoc(SearchShardTarget shardTarget, int doc, float score, Object[] fields) { + public ShardFieldDoc(int shardRequestId, int doc, float score, Object[] fields) { super(doc, score, fields); - this.shardTarget = shardTarget; + this.shardRequestId = shardRequestId; } @Override - public SearchShardTarget shardTarget() { - return this.shardTarget; + public int shardRequestId() { + return this.shardRequestId; } @Override diff --git a/src/main/java/org/elasticsearch/search/controller/ShardScoreDoc.java b/src/main/java/org/elasticsearch/search/controller/ShardScoreDoc.java index c0be8b855a1..b474c6d4bee 100644 --- a/src/main/java/org/elasticsearch/search/controller/ShardScoreDoc.java +++ b/src/main/java/org/elasticsearch/search/controller/ShardScoreDoc.java @@ -20,23 +20,22 @@ package org.elasticsearch.search.controller; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.search.SearchShardTarget; /** * */ public class ShardScoreDoc extends ScoreDoc implements ShardDoc { - private final SearchShardTarget shardTarget; + private final int shardRequestId; - public ShardScoreDoc(SearchShardTarget shardTarget, int doc, float score) { + public ShardScoreDoc(int shardRequestId, int doc, float score) { super(doc, score); - this.shardTarget = shardTarget; + this.shardRequestId = shardRequestId; } @Override - public SearchShardTarget shardTarget() { - return this.shardTarget; + public int shardRequestId() { + return this.shardRequestId; } @Override