search reducer to use atomic reference arrays

move away from maps to correlate between responses from different shards to unique incremental integer representing a shardRequestId (unique for the specific search request)

 this allows to no longer require using maps (or CHM), and simply use atomic reference arrays, which rely on volatiles. it also removes the need to use a cache for heavy data structures since we don't really have them around anymore...
This commit is contained in:
Shay Banon 2013-07-14 00:51:54 +02:00
parent 2762fed04f
commit 8e0d23b147
20 changed files with 522 additions and 532 deletions

View File

@ -231,7 +231,6 @@ public class ActionModule extends AbstractModule {
registerAction(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class,
TransportIndexDeleteByQueryAction.class, TransportShardDeleteByQueryAction.class);
registerAction(SearchAction.INSTANCE, TransportSearchAction.class,
TransportSearchCache.class,
TransportSearchDfsQueryThenFetchAction.class,
TransportSearchQueryThenFetchAction.class,
TransportSearchDfsQueryAndFetchAction.class,

View File

@ -1,99 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
/**
*
*/
public class TransportSearchCache {
private final Queue<Collection<DfsSearchResult>> cacheDfsResults = ConcurrentCollections.newQueue();
private final Queue<Map<SearchShardTarget, QuerySearchResultProvider>> cacheQueryResults = ConcurrentCollections.newQueue();
private final Queue<Map<SearchShardTarget, FetchSearchResult>> cacheFetchResults = ConcurrentCollections.newQueue();
private final Queue<Map<SearchShardTarget, QueryFetchSearchResult>> cacheQueryFetchResults = ConcurrentCollections.newQueue();
public Collection<DfsSearchResult> obtainDfsResults() {
Collection<DfsSearchResult> dfsSearchResults;
while ((dfsSearchResults = cacheDfsResults.poll()) == null) {
Queue<DfsSearchResult> offer = ConcurrentCollections.newQueue();
cacheDfsResults.offer(offer);
}
return dfsSearchResults;
}
public void releaseDfsResults(Collection<DfsSearchResult> dfsResults) {
dfsResults.clear();
cacheDfsResults.offer(dfsResults);
}
public Map<SearchShardTarget, QuerySearchResultProvider> obtainQueryResults() {
Map<SearchShardTarget, QuerySearchResultProvider> queryResults;
while ((queryResults = cacheQueryResults.poll()) == null) {
cacheQueryResults.offer(ConcurrentCollections.<SearchShardTarget, QuerySearchResultProvider>newConcurrentMap());
}
return queryResults;
}
public void releaseQueryResults(Map<SearchShardTarget, QuerySearchResultProvider> queryResults) {
queryResults.clear();
cacheQueryResults.offer(queryResults);
}
public Map<SearchShardTarget, FetchSearchResult> obtainFetchResults() {
Map<SearchShardTarget, FetchSearchResult> fetchResults;
while ((fetchResults = cacheFetchResults.poll()) == null) {
cacheFetchResults.offer(ConcurrentCollections.<SearchShardTarget, FetchSearchResult>newConcurrentMap());
}
return fetchResults;
}
public void releaseFetchResults(Map<SearchShardTarget, FetchSearchResult> fetchResults) {
fetchResults.clear();
cacheFetchResults.offer(fetchResults);
}
public Map<SearchShardTarget, QueryFetchSearchResult> obtainQueryFetchResults() {
Map<SearchShardTarget, QueryFetchSearchResult> fetchResults;
while ((fetchResults = cacheQueryFetchResults.poll()) == null) {
cacheQueryFetchResults.offer(ConcurrentCollections.<SearchShardTarget, QueryFetchSearchResult>newConcurrentMap());
}
return fetchResults;
}
public void releaseQueryFetchResults(Map<SearchShardTarget, QueryFetchSearchResult> fetchResults) {
fetchResults.clear();
cacheQueryFetchResults.offer(fetchResults);
}
}

View File

@ -19,16 +19,14 @@
package org.elasticsearch.action.search.type;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -37,11 +35,8 @@ import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
/**
@ -51,8 +46,8 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
@Inject
public TransportSearchCountAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController);
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, searchService, searchPhaseController);
}
@Override
@ -62,8 +57,6 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private final Map<SearchShardTarget, QuerySearchResultProvider> queryFetchResults = searchCache.obtainQueryResults();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
}
@ -78,21 +71,15 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
searchService.sendExecuteQuery(node, request, listener);
}
@Override
protected void processFirstPhaseResult(ShardRouting shard, QuerySearchResult result) {
queryFetchResults.put(result.shardTarget(), result);
}
@Override
protected void moveToSecondPhase() throws Exception {
// no need to sort, since we know we have no hits back
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryFetchResults, ImmutableMap.<SearchShardTarget, FetchSearchResultProvider>of());
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null);
scrollId = buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryResults(queryFetchResults);
}
}

View File

@ -23,10 +23,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -38,12 +37,8 @@ import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
/**
*
*/
@ -51,8 +46,8 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
@Inject
public TransportSearchDfsQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController);
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, searchService, searchPhaseController);
}
@Override
@ -62,13 +57,11 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
private class AsyncAction extends BaseAsyncAction<DfsSearchResult> {
private final Collection<DfsSearchResult> dfsResults = searchCache.obtainDfsResults();
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
queryFetchResults = new AtomicArray<QueryFetchSearchResult>(firstResults.length());
}
@Override
@ -81,24 +74,20 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
searchService.sendExecuteDfs(node, request, listener);
}
@Override
protected void processFirstPhaseResult(ShardRouting shard, DfsSearchResult result) {
dfsResults.add(result);
}
@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsResults);
final AtomicInteger counter = new AtomicInteger(dfsResults.size());
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
int localOperations = 0;
for (final DfsSearchResult dfsResult : dfsResults) {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
if (localOperations > 0) {
@ -106,18 +95,20 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final DfsSearchResult dfsResult : dfsResults) {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final DfsSearchResult dfsResult : dfsResults) {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
final DfsSearchResult dfsResult = entry.value;
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
@ -125,11 +116,11 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
});
} else {
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
}
@ -137,12 +128,12 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
}
void executeSecondPhase(final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
void executeSecondPhase(final int shardRequestId, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new SearchServiceListener<QueryFetchSearchResult>() {
@Override
public void onResult(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryFetchResults.put(result.shardTarget(), result);
queryFetchResults.set(shardRequestId, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -153,7 +144,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -172,17 +163,16 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
listener.onFailure(failure);
} finally {
searchCache.releaseDfsResults(dfsResults);
searchCache.releaseQueryFetchResults(queryFetchResults);
//
}
}
void innerFinishHim() throws Exception {
sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), dfsResults, null);
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}

View File

@ -23,10 +23,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
@ -39,11 +39,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -53,8 +50,8 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
@Inject
public TransportSearchDfsQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController);
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, searchService, searchPhaseController);
}
@Override
@ -64,16 +61,15 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
private class AsyncAction extends BaseAsyncAction<DfsSearchResult> {
private final Collection<DfsSearchResult> dfsResults = searchCache.obtainDfsResults();
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private volatile Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<ExtTIntArrayList> docIdsToLoad;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
queryResults = new AtomicArray<QuerySearchResult>(firstResults.length());
fetchResults = new AtomicArray<FetchSearchResult>(firstResults.length());
docIdsToLoad = new AtomicArray<ExtTIntArrayList>(firstResults.length());
}
@Override
@ -86,25 +82,20 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
searchService.sendExecuteDfs(node, request, listener);
}
@Override
protected void processFirstPhaseResult(ShardRouting shard, DfsSearchResult result) {
dfsResults.add(result);
}
@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsResults);
final AtomicInteger counter = new AtomicInteger(dfsResults.size());
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
int localOperations = 0;
for (final DfsSearchResult dfsResult : dfsResults) {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(dfsResult, counter, querySearchRequest, node);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
}
@ -113,18 +104,20 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final DfsSearchResult dfsResult : dfsResults) {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(dfsResult, counter, querySearchRequest, node);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final DfsSearchResult dfsResult : dfsResults) {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
final DfsSearchResult dfsResult = entry.value;
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
@ -132,11 +125,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQuery(dfsResult, counter, querySearchRequest, node);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
});
} else {
executeQuery(dfsResult, counter, querySearchRequest, node);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
}
}
@ -144,12 +137,12 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
void executeQuery(final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
void executeQuery(final int shardRequestId, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener<QuerySearchResult>() {
@Override
public void onResult(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryResults.put(result.shardTarget(), result);
queryResults.set(shardRequestId, result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
@ -160,7 +153,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
@ -178,24 +171,24 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
void innerExecuteFetchPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults.values());
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
this.docIdsToLoad = docIdsToLoad;
sortedShardList = searchPhaseController.sortDocs(queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.isEmpty()) {
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
int localOperations = 0;
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
for (final AtomicArray.Entry<ExtTIntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
@ -204,30 +197,32 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
for (final AtomicArray.Entry<ExtTIntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
final DiscoveryNode node = nodes.get(entry.getKey().nodeId());
for (final AtomicArray.Entry<ExtTIntArrayList> entry : docIdsToLoad.asList()) {
final QuerySearchResult queryResult = queryResults.get(entry.index);
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue());
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
}
@ -235,12 +230,12 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
void executeFetch(final int shardRequestId, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.put(result.shardTarget(), result);
fetchResults.set(shardRequestId, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -251,7 +246,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -264,16 +259,13 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
try {
innerFinishHim();
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
searchCache.releaseDfsResults(dfsResults);
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
}
}
@ -281,7 +273,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults, null);
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.search.type;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.UnicodeUtil;
@ -35,12 +34,12 @@ import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ShardSearchRequest;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
/**
@ -59,7 +58,7 @@ public abstract class TransportSearchHelper {
return new InternalScrollSearchRequest(request, id);
}
public static String buildScrollId(SearchType searchType, Collection<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
public static String buildScrollId(SearchType searchType, AtomicArray<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) {
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults, attributes);
} else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) {
@ -71,10 +70,11 @@ public abstract class TransportSearchHelper {
}
}
public static String buildScrollId(String type, Collection<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
public static String buildScrollId(String type, AtomicArray<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
StringBuilder sb = new StringBuilder().append(type).append(';');
sb.append(searchPhaseResults.size()).append(';');
for (SearchPhaseResult searchPhaseResult : searchPhaseResults) {
sb.append(searchPhaseResults.asList().size()).append(';');
for (AtomicArray.Entry<? extends SearchPhaseResult> entry : searchPhaseResults.asList()) {
SearchPhaseResult searchPhaseResult = entry.value;
sb.append(searchPhaseResult.id()).append(':').append(searchPhaseResult.shardTarget().nodeId()).append(';');
}
if (attributes == null) {

View File

@ -20,14 +20,13 @@
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -36,7 +35,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.io.IOException;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
@ -47,8 +46,8 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
@Inject
public TransportSearchQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController);
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, searchService, searchPhaseController);
}
@Override
@ -58,9 +57,6 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
private class AsyncAction extends BaseAsyncAction<QueryFetchSearchResult> {
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
}
@ -76,20 +72,26 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
}
@Override
protected void processFirstPhaseResult(ShardRouting shard, QueryFetchSearchResult result) {
queryFetchResults.put(result.shardTarget(), result);
protected void moveToSecondPhase() throws Exception {
try {
innerFinishHim();
} catch (Throwable e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
}
}
@Override
protected void moveToSecondPhase() throws Exception {
sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
private void innerFinishHim() throws IOException {
sortedShardList = searchPhaseController.sortDocs(firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null);
scrollId = buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryFetchResults(queryFetchResults);
}
}
}

View File

@ -23,10 +23,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
@ -36,10 +36,8 @@ import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -49,8 +47,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Inject
public TransportSearchQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController);
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, searchService, searchPhaseController);
}
@Override
@ -60,14 +58,13 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private volatile Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad;
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<ExtTIntArrayList> docIdsToLoad;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
fetchResults = new AtomicArray<FetchSearchResult>(firstResults.length());
docIdsToLoad = new AtomicArray<ExtTIntArrayList>(firstResults.length());
}
@Override
@ -80,32 +77,27 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
searchService.sendExecuteQuery(node, request, listener);
}
@Override
protected void processFirstPhaseResult(ShardRouting shard, QuerySearchResult result) {
queryResults.put(result.shardTarget(), result);
}
@Override
protected void moveToSecondPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults.values());
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
this.docIdsToLoad = docIdsToLoad;
sortedShardList = searchPhaseController.sortDocs(firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.isEmpty()) {
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
int localOperations = 0;
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
for (AtomicArray.Entry<ExtTIntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
@ -114,30 +106,32 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
for (AtomicArray.Entry<ExtTIntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
final DiscoveryNode node = nodes.get(entry.getKey().nodeId());
for (final AtomicArray.Entry<ExtTIntArrayList> entry : docIdsToLoad.asList()) {
final QuerySearchResult queryResult = firstResults.get(entry.index);
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(entry.getKey()).id(), entry.getValue());
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
}
@ -145,12 +139,12 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
}
}
void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
void executeFetch(final int shardRequestId, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.put(result.shardTarget(), result);
fetchResults.set(shardRequestId, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -161,7 +155,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -180,17 +174,15 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
}
listener.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
}
void innerFinishHim() throws Exception {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values(), null);
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}

View File

@ -25,10 +25,9 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -37,19 +36,16 @@ import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
public class TransportSearchScanAction extends TransportSearchTypeAction {
@Inject
public TransportSearchScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController);
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, searchService, searchPhaseController);
}
@Override
@ -59,8 +55,6 @@ public class TransportSearchScanAction extends TransportSearchTypeAction {
private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
}
@ -75,20 +69,14 @@ public class TransportSearchScanAction extends TransportSearchTypeAction {
searchService.sendExecuteScan(node, request, listener);
}
@Override
protected void processFirstPhaseResult(ShardRouting shard, QuerySearchResult result) {
queryResults.put(result.shardTarget(), result);
}
@Override
protected void moveToSecondPhase() throws Exception {
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryResults, ImmutableMap.<SearchShardTarget, FetchSearchResultProvider>of());
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryResults.values(), ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryResults(queryResults);
}
}

View File

@ -28,8 +28,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -38,8 +37,7 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.Queue;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
@ -57,16 +55,12 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
private final SearchPhaseController searchPhaseController;
private final TransportSearchCache searchCache;
@Inject
public TransportSearchScrollQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@ -85,12 +79,10 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
private final DiscoveryNodes nodes;
private volatile Queue<ShardSearchFailure> shardFailures;
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private final AtomicInteger successfulOps;
private final AtomicInteger counter;
private final long startTime = System.currentTimeMillis();
@ -102,23 +94,29 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
this.counter = new AtomicInteger(scrollId.getContext().length);
this.queryFetchResults = new AtomicArray<QueryFetchSearchResult>(scrollId.getContext().length);
}
protected final ShardSearchFailure[] buildShardFailures() {
Queue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = ConcurrentCollections.newQueue();
shardFailures = new AtomicArray<ShardSearchFailure>(scrollId.getContext().length);
}
shardFailures.add(failure);
shardFailures.set(shardRequestId, failure);
}
public void start() {
@ -128,13 +126,15 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
Tuple<String, Long> target = context[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executePhase(node, target.v2());
executePhase(i, node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
@ -152,28 +152,33 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
Tuple<String, Long> target = context1[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(node, target.v2());
executePhase(i, node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardRequestId = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(node, target.v2());
executePhase(shardRequestId, node, target.v2());
}
});
} else {
executePhase(node, target.v2());
executePhase(shardRequestId, node, target.v2());
}
}
}
@ -195,11 +200,11 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
}
private void executePhase(DiscoveryNode node, final long searchId) {
private void executePhase(final int shardRequestId, DiscoveryNode node, final long searchId) {
searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override
public void onResult(QueryFetchSearchResult result) {
queryFetchResults.put(result.shardTarget(), result);
queryFetchResults.set(shardRequestId, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -210,7 +215,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(new ShardSearchFailure(t));
addShardFailure(shardRequestId, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -228,13 +233,12 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
private void innerFinishHim() {
ShardDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
ShardDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures()));
}

View File

@ -29,8 +29,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -39,11 +38,9 @@ import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.Queue;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
@ -61,16 +58,12 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
private final SearchPhaseController searchPhaseController;
private final TransportSearchCache searchCache;
@Inject
public TransportSearchScrollQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@ -89,11 +82,9 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
private final DiscoveryNodes nodes;
protected volatile Queue<ShardSearchFailure> shardFailures;
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
private volatile ShardDoc[] sortedShardList;
@ -107,23 +98,30 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
this.queryResults = new AtomicArray<QuerySearchResult>(scrollId.getContext().length);
this.fetchResults = new AtomicArray<FetchSearchResult>(scrollId.getContext().length);
}
protected final ShardSearchFailure[] buildShardFailures() {
Queue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = ConcurrentCollections.newQueue();
shardFailures = new AtomicArray<ShardSearchFailure>(scrollId.getContext().length);
}
shardFailures.add(failure);
shardFailures.set(shardRequestId, failure);
}
public void start() {
@ -134,13 +132,15 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length);
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
Tuple<String, Long> target = context[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executeQueryPhase(counter, node, target.v2());
executeQueryPhase(i, counter, node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
@ -158,28 +158,33 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
Tuple<String, Long> target = context1[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executeQueryPhase(counter, node, target.v2());
executeQueryPhase(i, counter, node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardRequestId = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQueryPhase(counter, node, target.v2());
executeQueryPhase(shardRequestId, counter, node, target.v2());
}
});
} else {
executeQueryPhase(counter, node, target.v2());
executeQueryPhase(shardRequestId, counter, node, target.v2());
}
}
}
@ -187,11 +192,11 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
}
private void executeQueryPhase(final AtomicInteger counter, DiscoveryNode node, final long searchId) {
private void executeQueryPhase(final int shardRequestId, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
searchService.sendExecuteQuery(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QuerySearchResult>() {
@Override
public void onResult(QuerySearchResult result) {
queryResults.put(result.shardTarget(), result);
queryResults.set(shardRequestId, result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
@ -202,7 +207,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(new ShardSearchFailure(t));
addShardFailure(shardRequestId, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
@ -212,25 +217,26 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
private void executeFetchPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults.values());
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
sortedShardList = searchPhaseController.sortDocs(queryResults);
AtomicArray<ExtTIntArrayList> docIdsToLoad = new AtomicArray<ExtTIntArrayList>(queryResults.length());
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.isEmpty()) {
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
}
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
SearchShardTarget shardTarget = entry.getKey();
ExtTIntArrayList docIds = entry.getValue();
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResults.get(shardTarget).id(), docIds);
DiscoveryNode node = nodes.get(shardTarget.nodeId());
for (final AtomicArray.Entry<ExtTIntArrayList> entry : docIdsToLoad.asList()) {
ExtTIntArrayList docIds = entry.value;
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(entry.getKey());
fetchResults.put(result.shardTarget(), result);
result.shardTarget(querySearchResult.shardTarget());
fetchResults.set(entry.index, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -266,8 +272,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures()));
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
}
}
}

View File

@ -29,8 +29,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -42,8 +41,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
@ -61,16 +59,12 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
private final SearchPhaseController searchPhaseController;
private final TransportSearchCache searchCache;
@Inject
public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@ -89,14 +83,11 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
private final DiscoveryNodes nodes;
protected volatile Queue<ShardSearchFailure> shardFailures;
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private final AtomicInteger successfulOps;
private final AtomicInteger counter;
private final long startTime = System.currentTimeMillis();
private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
@ -106,41 +97,48 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
this.counter = new AtomicInteger(scrollId.getContext().length);
this.queryFetchResults = new AtomicArray<QueryFetchSearchResult>(scrollId.getContext().length);
}
protected final ShardSearchFailure[] buildShardFailures() {
Queue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = ConcurrentCollections.newQueue();
shardFailures = new AtomicArray<ShardSearchFailure>(scrollId.getContext().length);
}
shardFailures.add(failure);
shardFailures.set(shardRequestId, failure);
}
public void start() {
if (scrollId.getContext().length == 0) {
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.getAttributes().get("total_hits")), 0.0f), null, null, false);
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures()));
return;
}
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
Tuple<String, Long> target = context[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executePhase(node, target.v2());
executePhase(i, node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
@ -158,28 +156,33 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
Tuple<String, Long> target = context1[i];
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(node, target.v2());
executePhase(i, node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.getContext()) {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardRequestId = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(node, target.v2());
executePhase(shardRequestId, node, target.v2());
}
});
} else {
executePhase(node, target.v2());
executePhase(shardRequestId, node, target.v2());
}
}
}
@ -201,11 +204,11 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
}
}
private void executePhase(DiscoveryNode node, final long searchId) {
private void executePhase(final int shardRequestId, DiscoveryNode node, final long searchId) {
searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override
public void onResult(QueryFetchSearchResult result) {
queryFetchResults.put(result.shardTarget(), result);
queryFetchResults.set(shardRequestId, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -216,7 +219,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(new ShardSearchFailure(t));
addShardFailure(shardRequestId, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -234,39 +237,37 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
logger.debug("failed to reduce search", failure);
}
listener.onFailure(failure);
} finally {
searchCache.releaseQueryFetchResults(queryFetchResults);
}
}
private void innerFinishHim() throws IOException {
int numberOfHits = 0;
for (QueryFetchSearchResult shardResult : queryFetchResults.values()) {
numberOfHits += shardResult.queryResult().topDocs().scoreDocs.length;
for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
numberOfHits += entry.value.queryResult().topDocs().scoreDocs.length;
}
ShardDoc[] docs = new ShardDoc[numberOfHits];
int counter = 0;
for (QueryFetchSearchResult shardResult : queryFetchResults.values()) {
ScoreDoc[] scoreDocs = shardResult.queryResult().topDocs().scoreDocs;
for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
ScoreDoc[] scoreDocs = entry.value.queryResult().topDocs().scoreDocs;
for (ScoreDoc scoreDoc : scoreDocs) {
docs[counter++] = new ShardScoreDoc(shardResult.shardTarget(), scoreDoc.doc, 0.0f);
docs[counter++] = new ShardScoreDoc(entry.index, scoreDoc.doc, 0.0f);
}
}
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults);
((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.getAttributes().get("total_hits"));
for (QueryFetchSearchResult shardResult : queryFetchResults.values()) {
if (shardResult.queryResult().topDocs().scoreDocs.length < shardResult.queryResult().size()) {
for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
if (entry.value.queryResult().topDocs().scoreDocs.length < entry.value.queryResult().size()) {
// we found more than we want for this round, remove this from our scrolling
queryFetchResults.remove(shardResult.shardTarget());
queryFetchResults.set(entry.index, null);
}
}
String scrollId = null;
if (request.scroll() != null) {
// we rebuild the scroll id since we remove shards that we finished scrolling on
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.getType(), queryFetchResults.values(), this.scrollId.getAttributes()); // continue moving the total_hits
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.getType(), queryFetchResults, this.scrollId.getAttributes()); // continue moving the total_hits
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures()));

View File

@ -33,7 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
@ -46,8 +46,8 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -64,13 +64,10 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected final SearchPhaseController searchPhaseController;
protected final TransportSearchCache searchCache;
public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool);
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@ -79,7 +76,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected final ActionListener<SearchResponse> listener;
private final GroupShardsIterator shardsIts;
protected final GroupShardsIterator shardsIts;
protected final SearchRequest request;
@ -87,15 +84,13 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected final DiscoveryNodes nodes;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
protected final AtomicInteger successulOps = new AtomicInteger();
private final AtomicInteger totalOps = new AtomicInteger();
private volatile Queue<ShardSearchFailure> shardFailures;
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
protected volatile ShardDoc[] sortedShardList;
protected final long startTime = System.currentTimeMillis();
@ -126,24 +121,28 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
// not search shards to search on...
throw new SearchPhaseExecutionException("initial", "No indices / shards to search on, requested indices are " + Arrays.toString(request.indices()), buildShardFailures());
}
firstResults = new AtomicArray<FirstResult>(shardsIts.size());
}
public void start() {
request.beforeStart();
// count the local operations, and perform the non local ones
int localOperations = 0;
int shardRequestId = -1;
for (final ShardIterator shardIt : shardsIts) {
shardRequestId++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
// do the remote operation here, the localAsync flag is not relevant
performFirstPhase(shardIt);
performFirstPhase(shardRequestId, shardIt);
}
} else {
// really, no shards active in this group
onFirstPhaseResult(null, shardIt, null);
onFirstPhaseResult(shardRequestId, null, shardIt, null);
}
}
// we have local operations, perform them now
@ -153,11 +152,13 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
int shardRequestId = -1;
for (final ShardIterator shardIt : shardsIts) {
shardRequestId++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performFirstPhase(shardIt);
performFirstPhase(shardRequestId, shardIt);
}
}
}
@ -168,7 +169,10 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
if (localAsync) {
request.beforeLocalFork();
}
shardRequestId = -1;
for (final ShardIterator shardIt : shardsIts) {
shardRequestId++;
final int fShardRequestId = shardRequestId;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
@ -176,11 +180,11 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
performFirstPhase(shardIt);
performFirstPhase(fShardRequestId, shardIt);
}
});
} else {
performFirstPhase(shardIt);
performFirstPhase(fShardRequestId, shardIt);
}
}
}
@ -189,45 +193,45 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
void performFirstPhase(final ShardIterator shardIt) {
performFirstPhase(shardIt, shardIt.nextOrNull());
void performFirstPhase(final int shardRequestId, final ShardIterator shardIt) {
performFirstPhase(shardRequestId, shardIt, shardIt.nextOrNull());
}
void performFirstPhase(final ShardIterator shardIt, final ShardRouting shard) {
void performFirstPhase(final int shardRequestId, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(null, shardIt, null);
onFirstPhaseResult(shardRequestId, null, shardIt, null);
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shard, shardIt, null);
onFirstPhaseResult(shardRequestId, shard, shardIt, null);
} else {
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime), new SearchServiceListener<FirstResult>() {
@Override
public void onResult(FirstResult result) {
onFirstPhaseResult(shard, result, shardIt);
onFirstPhaseResult(shardRequestId, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shard, shardIt, t);
onFirstPhaseResult(shardRequestId, shard, shardIt, t);
}
});
}
}
}
void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardIterator shardIt) {
void onFirstPhaseResult(int shardRequestId, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
processFirstPhaseResult(shard, result);
processFirstPhaseResult(shardRequestId, shard, result);
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
successulOps.incrementAndGet();
if (xTotalOps == expectedTotalOps) {
try {
moveToSecondPhase();
innerMoveToSecondPhase();
} catch (Throwable e) {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
@ -237,7 +241,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
void onFirstPhaseResult(@Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
void onFirstPhaseResult(final int shardRequestId, @Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
if (totalOps.incrementAndGet() == expectedTotalOps) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
@ -252,16 +256,16 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
// no more shards, add a failure
if (t == null) {
// no active shards
addShardFailure(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
addShardFailure(shardRequestId, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
} else {
addShardFailure(new ShardSearchFailure(t));
addShardFailure(shardRequestId, new ShardSearchFailure(t));
}
if (successulOps.get() == 0) {
// no successful ops, raise an exception
listener.onFailure(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
} else {
try {
moveToSecondPhase();
innerMoveToSecondPhase();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
@ -279,7 +283,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
}
performFirstPhase(shardIt, nextShard);
performFirstPhase(shardRequestId, shardIt, nextShard);
} else {
// no more shards active, add a failure
// e is null when there is no next active....
@ -294,9 +298,9 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
if (t == null) {
// no active shards
addShardFailure(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
addShardFailure(shardRequestId, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
} else {
addShardFailure(new ShardSearchFailure(t));
addShardFailure(shardRequestId, new ShardSearchFailure(t));
}
}
}
@ -310,37 +314,41 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
protected final ShardSearchFailure[] buildShardFailures() {
Queue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = ConcurrentCollections.newQueue();
shardFailures = new AtomicArray<ShardSearchFailure>(shardsIts.size());
}
shardFailures.add(failure);
shardFailures.set(shardRequestId, failure);
}
/**
* Releases shard targets that are not used in the docsIdsToLoad.
*/
protected void releaseIrrelevantSearchContexts(Map<SearchShardTarget, QuerySearchResultProvider> queryResults,
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad) {
protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearchResultProvider> queryResults,
AtomicArray<ExtTIntArrayList> docIdsToLoad) {
if (docIdsToLoad == null) {
return;
}
// we only release search context that we did not fetch from if we are not scrolling
if (request.scroll() == null) {
for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) {
if (!docIdsToLoad.containsKey(entry.getKey())) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
if (docIdsToLoad.get(entry.index) == null) {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
if (node != null) { // should not happen (==null) but safeguard anyhow
searchService.sendFreeContext(node, entry.getValue().id(), request);
searchService.sendFreeContext(node, entry.value.queryResult().id(), request);
}
}
}
@ -349,7 +357,13 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<FirstResult> listener);
protected abstract void processFirstPhaseResult(ShardRouting shard, FirstResult result);
protected final void processFirstPhaseResult(int shardRequestId, ShardRouting shard, FirstResult result) {
firstResults.set(shardRequestId, result);
}
final void innerMoveToSecondPhase() throws Exception {
moveToSecondPhase();
}
protected abstract void moveToSecondPhase() throws Exception;

View File

@ -126,7 +126,7 @@ public class ShardFieldDocSortedHitQueue extends PriorityQueue<ShardFieldDoc> {
// avoid random sort order that could lead to duplicates (bug #31241):
if (c == 0) {
// CHANGE: Add shard base tie breaking
c = docA.shardTarget().compareTo(docB.shardTarget());
c = docA.shardRequestId() - docB.shardRequestId();
if (c == 0) {
return docA.doc > docB.doc;
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* A list backed by an {@link AtomicReferenceArray} with potential null values, easily allowing
* to get the concrete values as a list using {@link #asList()}.
*/
public class AtomicArray<E> {
private static final AtomicArray EMPTY = new AtomicArray(0);
@SuppressWarnings("unchecked")
public static <E> E empty() {
return (E) EMPTY;
}
private final AtomicReferenceArray<E> array;
private volatile List<Entry<E>> nonNullList;
public AtomicArray(int size) {
array = new AtomicReferenceArray<E>(size);
}
/**
* The size of the expected results, including potential null values.
*/
public int length() {
return array.length();
}
/**
* Sets the element at position {@code i} to the given value.
*
* @param i the index
* @param value the new value
*/
public void set(int i, E value) {
array.set(i, value);
if (nonNullList != null) { // read first, lighter, and most times it will be null...
nonNullList = null;
}
}
/**
* Gets the current value at position {@code i}.
*
* @param i the index
* @return the current value
*/
public E get(int i) {
return array.get(i);
}
/**
* Returns the it as a non null list, with an Entry wrapping each value allowing to
* retain its index.
*/
public List<Entry<E>> asList() {
if (nonNullList == null) {
if (array == null || array.length() == 0) {
nonNullList = ImmutableList.of();
} else {
List<Entry<E>> list = new ArrayList<Entry<E>>(array.length());
for (int i = 0; i < array.length(); i++) {
E e = array.get(i);
if (e != null) {
list.add(new Entry<E>(i, e));
}
}
nonNullList = list;
}
}
return nonNullList;
}
/**
* An entry within the array.
*/
public static class Entry<E> {
/**
* The original index of the value within the array.
*/
public final int index;
/**
* The value.
*/
public final E value;
public Entry(int index, E value) {
this.index = index;
this.value = value;
}
}
}

View File

@ -23,8 +23,6 @@ import org.apache.lucene.util.PriorityQueue;
/**
* <p>Same as lucene {@link org.apache.lucene.search.HitQueue}.
*
*
*/
public class ScoreDocQueue extends PriorityQueue<ShardScoreDoc> {
@ -34,7 +32,7 @@ public class ScoreDocQueue extends PriorityQueue<ShardScoreDoc> {
protected final boolean lessThan(ShardScoreDoc hitA, ShardScoreDoc hitB) {
if (hitA.score == hitB.score) {
int c = hitA.shardTarget().compareTo(hitB.shardTarget());
int c = hitA.shardRequestId() - hitB.shardRequestId();
if (c == 0) {
return hitA.doc > hitB.doc;
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search.controller;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.lucene.index.Term;
@ -33,7 +32,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.ShardFieldDocSortedHitQueue;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.facet.Facet;
@ -49,7 +48,6 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -59,12 +57,12 @@ import java.util.Map;
*/
public class SearchPhaseController extends AbstractComponent {
public static Ordering<QuerySearchResultProvider> QUERY_RESULT_ORDERING = new Ordering<QuerySearchResultProvider>() {
public static Ordering<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = new Ordering<AtomicArray.Entry<? extends QuerySearchResultProvider>>() {
@Override
public int compare(@Nullable QuerySearchResultProvider o1, @Nullable QuerySearchResultProvider o2) {
int i = o1.shardTarget().index().compareTo(o2.shardTarget().index());
public int compare(@Nullable AtomicArray.Entry<? extends QuerySearchResultProvider> o1, @Nullable AtomicArray.Entry<? extends QuerySearchResultProvider> o2) {
int i = o1.value.shardTarget().index().compareTo(o2.value.shardTarget().index());
if (i == 0) {
i = o1.shardTarget().shardId() - o2.shardTarget().shardId();
i = o1.value.shardTarget().shardId() - o2.value.shardTarget().shardId();
}
return i;
}
@ -86,13 +84,13 @@ public class SearchPhaseController extends AbstractComponent {
return optimizeSingleShard;
}
public AggregatedDfs aggregateDfs(Iterable<DfsSearchResult> results) {
public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) {
Map<Term, TermStatistics> termStatistics = XMaps.newNoNullKeysMap();
Map<String, CollectionStatistics> fieldStatistics = XMaps.newNoNullKeysMap();
long aggMaxDoc = 0;
for (DfsSearchResult result : results) {
final Term[] terms = result.terms();
final TermStatistics[] stats = result.termStatistics();
for (AtomicArray.Entry<DfsSearchResult> lEntry : results.asList()) {
final Term[] terms = lEntry.value.terms();
final TermStatistics[] stats = lEntry.value.termStatistics();
assert terms.length == stats.length;
for (int i = 0; i < terms.length; i++) {
assert terms[i] != null;
@ -102,14 +100,14 @@ public class SearchPhaseController extends AbstractComponent {
// totalTermFrequency is an optional statistic we need to check if either one or both
// are set to -1 which means not present and then set it globally to -1
termStatistics.put(terms[i], new TermStatistics(existing.term(),
existing.docFreq() + stats[i].docFreq(),
optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq())));
existing.docFreq() + stats[i].docFreq(),
optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq())));
} else {
termStatistics.put(terms[i], stats[i]);
}
}
for (Map.Entry<String, CollectionStatistics> entry : result.fieldStatistics().entrySet()) {
for (Map.Entry<String, CollectionStatistics> entry : lEntry.value.fieldStatistics().entrySet()) {
assert entry.getKey() != null;
CollectionStatistics existing = fieldStatistics.get(entry.getKey());
if (existing != null) {
@ -124,36 +122,39 @@ public class SearchPhaseController extends AbstractComponent {
fieldStatistics.put(entry.getKey(), entry.getValue());
}
}
aggMaxDoc += result.maxDoc();
aggMaxDoc += lEntry.value.maxDoc();
}
return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc);
}
private static long optionalSum(long left, long right) {
return Math.min(left, right) == -1 ? -1 : left + right;
return Math.min(left, right) == -1 ? -1 : left + right;
}
public ShardDoc[] sortDocs(Collection<? extends QuerySearchResultProvider> results1) {
if (results1.isEmpty()) {
public ShardDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> results1) {
if (results1.asList().isEmpty()) {
return EMPTY;
}
if (optimizeSingleShard) {
boolean canOptimize = false;
QuerySearchResult result = null;
if (results1.size() == 1) {
int shardRequestId = -1;
if (results1.asList().size() == 1) {
canOptimize = true;
result = results1.iterator().next().queryResult();
result = results1.asList().get(0).value.queryResult();
shardRequestId = results1.asList().get(0).index;
} else {
// lets see if we only got hits from a single shard, if so, we can optimize...
for (QuerySearchResultProvider queryResult : results1) {
if (queryResult.queryResult().topDocs().scoreDocs.length > 0) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results1.asList()) {
if (entry.value.queryResult().topDocs().scoreDocs.length > 0) {
if (result != null) { // we already have one, can't really optimize
canOptimize = false;
break;
}
canOptimize = true;
result = queryResult.queryResult();
result = entry.value.queryResult();
shardRequestId = entry.index;
}
}
}
@ -170,23 +171,22 @@ public class SearchPhaseController extends AbstractComponent {
ShardDoc[] docs = new ShardDoc[resultDocsSize];
for (int i = 0; i < resultDocsSize; i++) {
ScoreDoc scoreDoc = scoreDocs[result.from() + i];
docs[i] = new ShardFieldDoc(result.shardTarget(), scoreDoc.doc, scoreDoc.score, ((FieldDoc) scoreDoc).fields);
docs[i] = new ShardFieldDoc(shardRequestId, scoreDoc.doc, scoreDoc.score, ((FieldDoc) scoreDoc).fields);
}
return docs;
} else {
ShardDoc[] docs = new ShardDoc[resultDocsSize];
for (int i = 0; i < resultDocsSize; i++) {
ScoreDoc scoreDoc = scoreDocs[result.from() + i];
docs[i] = new ShardScoreDoc(result.shardTarget(), scoreDoc.doc, scoreDoc.score);
docs[i] = new ShardScoreDoc(shardRequestId, scoreDoc.doc, scoreDoc.score);
}
return docs;
}
}
}
List<? extends QuerySearchResultProvider> results = QUERY_RESULT_ORDERING.sortedCopy(results1);
QuerySearchResultProvider queryResultProvider = results.get(0);
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = QUERY_RESULT_ORDERING.sortedCopy(results1.asList());
QuerySearchResultProvider queryResultProvider = results.get(0).value;
int totalNumDocs = 0;
@ -203,8 +203,8 @@ public class SearchPhaseController extends AbstractComponent {
for (int i = 0; i < fieldDocs.fields.length; i++) {
boolean allValuesAreNull = true;
boolean resolvedField = false;
for (QuerySearchResultProvider resultProvider : results) {
for (ScoreDoc doc : resultProvider.queryResult().topDocs().scoreDocs) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
for (ScoreDoc doc : entry.value.queryResult().topDocs().scoreDocs) {
FieldDoc fDoc = (FieldDoc) doc;
if (fDoc.fields[i] != null) {
allValuesAreNull = false;
@ -227,12 +227,12 @@ public class SearchPhaseController extends AbstractComponent {
queue = new ShardFieldDocSortedHitQueue(fieldDocs.fields, queueSize);
// we need to accumulate for all and then filter the from
for (QuerySearchResultProvider resultProvider : results) {
QuerySearchResult result = resultProvider.queryResult();
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
QuerySearchResult result = entry.value.queryResult();
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
totalNumDocs += scoreDocs.length;
for (ScoreDoc doc : scoreDocs) {
ShardFieldDoc nodeFieldDoc = new ShardFieldDoc(result.shardTarget(), doc.doc, doc.score, ((FieldDoc) doc).fields);
ShardFieldDoc nodeFieldDoc = new ShardFieldDoc(entry.index, doc.doc, doc.score, ((FieldDoc) doc).fields);
if (queue.insertWithOverflow(nodeFieldDoc) == nodeFieldDoc) {
// filled the queue, break
break;
@ -241,12 +241,12 @@ public class SearchPhaseController extends AbstractComponent {
}
} else {
queue = new ScoreDocQueue(queueSize); // we need to accumulate for all and then filter the from
for (QuerySearchResultProvider resultProvider : results) {
QuerySearchResult result = resultProvider.queryResult();
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
QuerySearchResult result = entry.value.queryResult();
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
totalNumDocs += scoreDocs.length;
for (ScoreDoc doc : scoreDocs) {
ShardScoreDoc nodeScoreDoc = new ShardScoreDoc(result.shardTarget(), doc.doc, doc.score);
ShardScoreDoc nodeScoreDoc = new ShardScoreDoc(entry.index, doc.doc, doc.score);
if (queue.insertWithOverflow(nodeScoreDoc) == nodeScoreDoc) {
// filled the queue, break
break;
@ -277,31 +277,31 @@ public class SearchPhaseController extends AbstractComponent {
return shardDocs;
}
public Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad(ShardDoc[] shardDocs) {
Map<SearchShardTarget, ExtTIntArrayList> result = XMaps.newMap();
/**
* Builds an array, with potential null elements, with docs to load.
*/
public void fillDocIdsToLoad(AtomicArray<ExtTIntArrayList> docsIdsToLoad, ShardDoc[] shardDocs) {
for (ShardDoc shardDoc : shardDocs) {
ExtTIntArrayList list = result.get(shardDoc.shardTarget());
ExtTIntArrayList list = docsIdsToLoad.get(shardDoc.shardRequestId());
if (list == null) {
list = new ExtTIntArrayList(); // can't be shared!, uses unsafe on it later on
result.put(shardDoc.shardTarget(), list);
docsIdsToLoad.set(shardDoc.shardRequestId(), list);
}
list.add(shardDoc.docId());
}
return result;
}
public InternalSearchResponse merge(ShardDoc[] sortedDocs, Map<SearchShardTarget, ? extends QuerySearchResultProvider> queryResults, Map<SearchShardTarget, ? extends FetchSearchResultProvider> fetchResults) {
public InternalSearchResponse merge(ShardDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResults, AtomicArray<? extends FetchSearchResultProvider> fetchResults) {
boolean sorted = false;
int sortScoreIndex = -1;
QuerySearchResult querySearchResult;
try {
querySearchResult = Iterables.get(queryResults.values(), 0).queryResult();
} catch (IndexOutOfBoundsException e) {
// no results, return an empty response
if (queryResults.asList().isEmpty()) {
return InternalSearchResponse.EMPTY;
}
QuerySearchResult querySearchResult = queryResults.asList().get(0).value.queryResult();
if (querySearchResult.topDocs() instanceof TopFieldDocs) {
sorted = true;
TopFieldDocs fieldDocs = (TopFieldDocs) querySearchResult.queryResult().topDocs();
@ -314,7 +314,7 @@ public class SearchPhaseController extends AbstractComponent {
// merge facets
InternalFacets facets = null;
if (!queryResults.isEmpty()) {
if (!queryResults.asList().isEmpty()) {
// we rely on the fact that the order of facets is the same on all query results
if (querySearchResult.facets() != null && querySearchResult.facets().facets() != null && !querySearchResult.facets().facets().isEmpty()) {
List<Facet> aggregatedFacets = Lists.newArrayList();
@ -322,8 +322,8 @@ public class SearchPhaseController extends AbstractComponent {
for (Facet facet : querySearchResult.facets()) {
// aggregate each facet name into a single list, and aggregate it
namedFacets.clear();
for (QuerySearchResultProvider queryResultProvider : queryResults.values()) {
for (Facet facet1 : queryResultProvider.queryResult().facets()) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
for (Facet facet1 : entry.value.queryResult().facets()) {
if (facet.getName().equals(facet1.getName())) {
namedFacets.add(facet1);
}
@ -342,13 +342,14 @@ public class SearchPhaseController extends AbstractComponent {
long totalHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
for (QuerySearchResultProvider queryResultProvider : queryResults.values()) {
if (queryResultProvider.queryResult().searchTimedOut()) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
QuerySearchResult result = entry.value.queryResult();
if (result.searchTimedOut()) {
timedOut = true;
}
totalHits += queryResultProvider.queryResult().topDocs().totalHits;
if (!Float.isNaN(queryResultProvider.queryResult().topDocs().getMaxScore())) {
maxScore = Math.max(maxScore, queryResultProvider.queryResult().topDocs().getMaxScore());
totalHits += result.topDocs().totalHits;
if (!Float.isNaN(result.topDocs().getMaxScore())) {
maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
}
}
if (Float.isInfinite(maxScore)) {
@ -356,15 +357,15 @@ public class SearchPhaseController extends AbstractComponent {
}
// clean the fetch counter
for (FetchSearchResultProvider fetchSearchResultProvider : fetchResults.values()) {
fetchSearchResultProvider.fetchResult().initCounter();
for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults.asList()) {
entry.value.fetchResult().initCounter();
}
// merge hits
List<InternalSearchHit> hits = new ArrayList<InternalSearchHit>();
if (!fetchResults.isEmpty()) {
if (!fetchResults.asList().isEmpty()) {
for (ShardDoc shardDoc : sortedDocs) {
FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardTarget());
FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardRequestId());
if (fetchResultProvider == null) {
continue;
}
@ -390,12 +391,11 @@ public class SearchPhaseController extends AbstractComponent {
// merge suggest results
Suggest suggest = null;
if (!queryResults.isEmpty()) {
if (!queryResults.asList().isEmpty()) {
final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<String, List<Suggest.Suggestion>>();
boolean hasSuggestions = false;
for (QuerySearchResultProvider resultProvider : queryResults.values()) {
Suggest shardResult = resultProvider.queryResult().suggest();
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
Suggest shardResult = entry.value.queryResult().queryResult().suggest();
if (shardResult == null) {
continue;

View File

@ -26,7 +26,7 @@ import org.elasticsearch.search.SearchShardTarget;
*/
public interface ShardDoc {
SearchShardTarget shardTarget();
int shardRequestId();
int docId();

View File

@ -27,21 +27,21 @@ import org.elasticsearch.search.SearchShardTarget;
*/
public class ShardFieldDoc extends FieldDoc implements ShardDoc {
private final SearchShardTarget shardTarget;
private final int shardRequestId;
public ShardFieldDoc(SearchShardTarget shardTarget, int doc, float score) {
public ShardFieldDoc(int shardRequestId, int doc, float score) {
super(doc, score);
this.shardTarget = shardTarget;
this.shardRequestId = shardRequestId;
}
public ShardFieldDoc(SearchShardTarget shardTarget, int doc, float score, Object[] fields) {
public ShardFieldDoc(int shardRequestId, int doc, float score, Object[] fields) {
super(doc, score, fields);
this.shardTarget = shardTarget;
this.shardRequestId = shardRequestId;
}
@Override
public SearchShardTarget shardTarget() {
return this.shardTarget;
public int shardRequestId() {
return this.shardRequestId;
}
@Override

View File

@ -20,23 +20,22 @@
package org.elasticsearch.search.controller;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.search.SearchShardTarget;
/**
*
*/
public class ShardScoreDoc extends ScoreDoc implements ShardDoc {
private final SearchShardTarget shardTarget;
private final int shardRequestId;
public ShardScoreDoc(SearchShardTarget shardTarget, int doc, float score) {
public ShardScoreDoc(int shardRequestId, int doc, float score) {
super(doc, score);
this.shardTarget = shardTarget;
this.shardRequestId = shardRequestId;
}
@Override
public SearchShardTarget shardTarget() {
return this.shardTarget;
public int shardRequestId() {
return this.shardRequestId;
}
@Override