move to use ScoreDoc/FieldDoc instead of our wrappers

now that we have the concept of a shardIndex as part of our search execution, we can simply move to use ScoreDoc and FieldDoc instead of having our own wrappers that held the info
Also, rename shardRequestId where needed to be called shardIndex to conform with the variable name in Lucene
This commit is contained in:
Shay Banon 2013-07-15 14:54:10 +02:00
parent 7098073a66
commit f28ff2becc
15 changed files with 115 additions and 259 deletions

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
@ -74,7 +73,7 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
@Override
protected void moveToSecondPhase() throws Exception {
// no need to sort, since we know we have no hits back
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
@ -82,6 +81,4 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
private static ShardDoc[] EMPTY_DOCS = new ShardDoc[0];
}

View File

@ -128,12 +128,12 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
}
void executeSecondPhase(final int shardRequestId, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new SearchServiceListener<QueryFetchSearchResult>() {
@Override
public void onResult(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryFetchResults.set(shardRequestId, result);
queryFetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -144,7 +144,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -137,12 +137,12 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
void executeQuery(final int shardRequestId, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener<QuerySearchResult>() {
@Override
public void onResult(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryResults.set(shardRequestId, result);
queryResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
@ -153,7 +153,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
@ -230,12 +230,12 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
void executeFetch(final int shardRequestId, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardRequestId, result);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -246,7 +246,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -139,12 +139,12 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
}
}
void executeFetch(final int shardRequestId, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardRequestId, result);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -155,7 +155,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardRequestId, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
@ -71,7 +70,7 @@ public class TransportSearchScanAction extends TransportSearchTypeAction {
@Override
protected void moveToSecondPhase() throws Exception {
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
@ -79,6 +78,4 @@ public class TransportSearchScanAction extends TransportSearchTypeAction {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
private static ShardDoc[] EMPTY_DOCS = new ShardDoc[0];
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
@ -32,7 +33,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
@ -112,11 +112,11 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<ShardSearchFailure>(scrollId.getContext().length);
}
shardFailures.set(shardRequestId, failure);
shardFailures.set(shardIndex, failure);
}
public void start() {
@ -167,18 +167,18 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardRequestId = i;
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(shardRequestId, node, target.v2());
executePhase(shardIndex, node, target.v2());
}
});
} else {
executePhase(shardRequestId, node, target.v2());
executePhase(shardIndex, node, target.v2());
}
}
}
@ -200,11 +200,11 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
}
private void executePhase(final int shardRequestId, DiscoveryNode node, final long searchId) {
private void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override
public void onResult(QueryFetchSearchResult result) {
queryFetchResults.set(shardRequestId, result);
queryFetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -215,7 +215,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardRequestId, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -233,7 +233,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
private void innerFinishHim() {
ShardDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
@ -33,7 +34,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
@ -86,7 +86,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
private volatile ShardDoc[] sortedShardList;
private volatile ScoreDoc[] sortedShardList;
private final AtomicInteger successfulOps;
@ -117,11 +117,11 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<ShardSearchFailure>(scrollId.getContext().length);
}
shardFailures.set(shardRequestId, failure);
shardFailures.set(shardIndex, failure);
}
public void start() {
@ -173,18 +173,18 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardRequestId = i;
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQueryPhase(shardRequestId, counter, node, target.v2());
executeQueryPhase(shardIndex, counter, node, target.v2());
}
});
} else {
executeQueryPhase(shardRequestId, counter, node, target.v2());
executeQueryPhase(shardIndex, counter, node, target.v2());
}
}
}
@ -192,11 +192,11 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
}
private void executeQueryPhase(final int shardRequestId, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
searchService.sendExecuteQuery(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QuerySearchResult>() {
@Override
public void onResult(QuerySearchResult result) {
queryResults.set(shardRequestId, result);
queryResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
@ -207,7 +207,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardRequestId, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();

View File

@ -33,8 +33,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.controller.ShardScoreDoc;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
@ -115,11 +113,11 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<ShardSearchFailure>(scrollId.getContext().length);
}
shardFailures.set(shardRequestId, failure);
shardFailures.set(shardIndex, failure);
}
public void start() {
@ -171,18 +169,18 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
Tuple<String, Long>[] context1 = scrollId.getContext();
for (int i = 0; i < context1.length; i++) {
final Tuple<String, Long> target = context1[i];
final int shardRequestId = i;
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(shardRequestId, node, target.v2());
executePhase(shardIndex, node, target.v2());
}
});
} else {
executePhase(shardRequestId, node, target.v2());
executePhase(shardIndex, node, target.v2());
}
}
}
@ -204,11 +202,11 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
}
}
private void executePhase(final int shardRequestId, DiscoveryNode node, final long searchId) {
private void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override
public void onResult(QueryFetchSearchResult result) {
queryFetchResults.set(shardRequestId, result);
queryFetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -219,7 +217,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardRequestId, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -245,12 +243,13 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
numberOfHits += entry.value.queryResult().topDocs().scoreDocs.length;
}
ShardDoc[] docs = new ShardDoc[numberOfHits];
ScoreDoc[] docs = new ScoreDoc[numberOfHits];
int counter = 0;
for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
ScoreDoc[] scoreDocs = entry.value.queryResult().topDocs().scoreDocs;
for (ScoreDoc scoreDoc : scoreDocs) {
docs[counter++] = new ShardScoreDoc(entry.index, scoreDoc.doc, 0.0f);
scoreDoc.shardIndex = entry.index;
docs[counter++] = scoreDoc;
}
}
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.TransportAction;
@ -40,7 +41,6 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
@ -91,7 +91,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
protected volatile ShardDoc[] sortedShardList;
protected volatile ScoreDoc[] sortedShardList;
protected final long startTime = System.currentTimeMillis();
@ -129,20 +129,20 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
request.beforeStart();
// count the local operations, and perform the non local ones
int localOperations = 0;
int shardRequestId = -1;
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardRequestId++;
shardIndex++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
// do the remote operation here, the localAsync flag is not relevant
performFirstPhase(shardRequestId, shardIt);
performFirstPhase(shardIndex, shardIt);
}
} else {
// really, no shards active in this group
onFirstPhaseResult(shardRequestId, null, shardIt, null);
onFirstPhaseResult(shardIndex, null, shardIt, null);
}
}
// we have local operations, perform them now
@ -152,13 +152,13 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
int shardRequestId = -1;
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardRequestId++;
shardIndex++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performFirstPhase(shardRequestId, shardIt);
performFirstPhase(shardIndex, shardIt);
}
}
}
@ -169,10 +169,10 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
if (localAsync) {
request.beforeLocalFork();
}
shardRequestId = -1;
shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardRequestId++;
final int fShardRequestId = shardRequestId;
shardIndex++;
final int fShardIndex = shardIndex;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
@ -180,11 +180,11 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
performFirstPhase(fShardRequestId, shardIt);
performFirstPhase(fShardIndex, shardIt);
}
});
} else {
performFirstPhase(fShardRequestId, shardIt);
performFirstPhase(fShardIndex, shardIt);
}
}
}
@ -193,38 +193,38 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
void performFirstPhase(final int shardRequestId, final ShardIterator shardIt) {
performFirstPhase(shardRequestId, shardIt, shardIt.nextOrNull());
void performFirstPhase(final int shardIndex, final ShardIterator shardIt) {
performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull());
}
void performFirstPhase(final int shardRequestId, final ShardIterator shardIt, final ShardRouting shard) {
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardRequestId, null, shardIt, null);
onFirstPhaseResult(shardIndex, null, shardIt, null);
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardRequestId, shard, shardIt, null);
onFirstPhaseResult(shardIndex, shard, shardIt, null);
} else {
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime), new SearchServiceListener<FirstResult>() {
@Override
public void onResult(FirstResult result) {
onFirstPhaseResult(shardRequestId, shard, result, shardIt);
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shardRequestId, shard, shardIt, t);
onFirstPhaseResult(shardIndex, shard, shardIt, t);
}
});
}
}
}
void onFirstPhaseResult(int shardRequestId, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
processFirstPhaseResult(shardRequestId, shard, result);
processFirstPhaseResult(shardIndex, shard, result);
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
@ -241,7 +241,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
void onFirstPhaseResult(final int shardRequestId, @Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
if (totalOps.incrementAndGet() == expectedTotalOps) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
@ -256,9 +256,9 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
// no more shards, add a failure
if (t == null) {
// no active shards
addShardFailure(shardRequestId, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
addShardFailure(shardIndex, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
} else {
addShardFailure(shardRequestId, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(t));
}
if (successulOps.get() == 0) {
// no successful ops, raise an exception
@ -283,7 +283,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
}
performFirstPhase(shardRequestId, shardIt, nextShard);
performFirstPhase(shardIndex, shardIt, nextShard);
} else {
// no more shards active, add a failure
// e is null when there is no next active....
@ -298,9 +298,9 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
if (t == null) {
// no active shards
addShardFailure(shardRequestId, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
addShardFailure(shardIndex, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
} else {
addShardFailure(shardRequestId, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(t));
}
}
}
@ -327,11 +327,11 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardRequestId, ShardSearchFailure failure) {
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<ShardSearchFailure>(shardsIts.size());
}
shardFailures.set(shardRequestId, failure);
shardFailures.set(shardIndex, failure);
}
/**
@ -357,8 +357,8 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<FirstResult> listener);
protected final void processFirstPhaseResult(int shardRequestId, ShardRouting shard, FirstResult result) {
firstResults.set(shardRequestId, result);
protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
firstResults.set(shardIndex, result);
}
final void innerMoveToSecondPhase() throws Exception {

View File

@ -19,20 +19,21 @@
package org.elasticsearch.search.controller;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.util.PriorityQueue;
/**
* <p>Same as lucene {@link org.apache.lucene.search.HitQueue}.
*/
public class ScoreDocQueue extends PriorityQueue<ShardScoreDoc> {
public class ScoreDocQueue extends PriorityQueue<ScoreDoc> {
public ScoreDocQueue(int size) {
super(size);
}
protected final boolean lessThan(ShardScoreDoc hitA, ShardScoreDoc hitB) {
protected final boolean lessThan(ScoreDoc hitA, ScoreDoc hitB) {
if (hitA.score == hitB.score) {
int c = hitA.shardRequestId() - hitB.shardRequestId();
int c = hitA.shardIndex - hitB.shardIndex;
if (c == 0) {
return hitA.doc > hitB.doc;
}

View File

@ -67,7 +67,7 @@ public class SearchPhaseController extends AbstractComponent {
}
};
private static final ShardDoc[] EMPTY = new ShardDoc[0];
public static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
private final CacheRecycler cacheRecycler;
private final boolean optimizeSingleShard;
@ -130,19 +130,19 @@ public class SearchPhaseController extends AbstractComponent {
return Math.min(left, right) == -1 ? -1 : left + right;
}
public ShardDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> results1) {
public ScoreDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> results1) {
if (results1.asList().isEmpty()) {
return EMPTY;
return EMPTY_DOCS;
}
if (optimizeSingleShard) {
boolean canOptimize = false;
QuerySearchResult result = null;
int shardRequestId = -1;
int shardIndex = -1;
if (results1.asList().size() == 1) {
canOptimize = true;
result = results1.asList().get(0).value.queryResult();
shardRequestId = results1.asList().get(0).index;
shardIndex = results1.asList().get(0).index;
} else {
// lets see if we only got hits from a single shard, if so, we can optimize...
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results1.asList()) {
@ -153,31 +153,33 @@ public class SearchPhaseController extends AbstractComponent {
}
canOptimize = true;
result = entry.value.queryResult();
shardRequestId = entry.index;
shardIndex = entry.index;
}
}
}
if (canOptimize) {
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
if (scoreDocs.length < result.from()) {
return EMPTY;
return EMPTY_DOCS;
}
int resultDocsSize = result.size();
if ((scoreDocs.length - result.from()) < resultDocsSize) {
resultDocsSize = scoreDocs.length - result.from();
}
if (result.topDocs() instanceof TopFieldDocs) {
ShardDoc[] docs = new ShardDoc[resultDocsSize];
ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
for (int i = 0; i < resultDocsSize; i++) {
ScoreDoc scoreDoc = scoreDocs[result.from() + i];
docs[i] = new ShardFieldDoc(shardRequestId, scoreDoc.doc, scoreDoc.score, ((FieldDoc) scoreDoc).fields);
scoreDoc.shardIndex = shardIndex;
docs[i] = scoreDoc;
}
return docs;
} else {
ShardDoc[] docs = new ShardDoc[resultDocsSize];
ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
for (int i = 0; i < resultDocsSize; i++) {
ScoreDoc scoreDoc = scoreDocs[result.from() + i];
docs[i] = new ShardScoreDoc(shardRequestId, scoreDoc.doc, scoreDoc.score);
scoreDoc.shardIndex = shardIndex;
docs[i] = scoreDoc;
}
return docs;
}
@ -231,8 +233,8 @@ public class SearchPhaseController extends AbstractComponent {
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
totalNumDocs += scoreDocs.length;
for (ScoreDoc doc : scoreDocs) {
ShardFieldDoc nodeFieldDoc = new ShardFieldDoc(entry.index, doc.doc, doc.score, ((FieldDoc) doc).fields);
if (queue.insertWithOverflow(nodeFieldDoc) == nodeFieldDoc) {
doc.shardIndex = entry.index;
if (queue.insertWithOverflow(doc) == doc) {
// filled the queue, break
break;
}
@ -245,8 +247,8 @@ public class SearchPhaseController extends AbstractComponent {
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
totalNumDocs += scoreDocs.length;
for (ScoreDoc doc : scoreDocs) {
ShardScoreDoc nodeScoreDoc = new ShardScoreDoc(entry.index, doc.doc, doc.score);
if (queue.insertWithOverflow(nodeScoreDoc) == nodeScoreDoc) {
doc.shardIndex = entry.index;
if (queue.insertWithOverflow(doc) == doc) {
// filled the queue, break
break;
}
@ -265,32 +267,32 @@ public class SearchPhaseController extends AbstractComponent {
}
if (resultDocsSize <= 0) {
return EMPTY;
return EMPTY_DOCS;
}
// we only pop the first, this handles "from" nicely since the "from" are down the queue
// that we already fetched, so we are actually popping the "from" and up to "size"
ShardDoc[] shardDocs = new ShardDoc[resultDocsSize];
ScoreDoc[] shardDocs = new ScoreDoc[resultDocsSize];
for (int i = resultDocsSize - 1; i >= 0; i--) // put docs in array
shardDocs[i] = (ShardDoc) queue.pop();
shardDocs[i] = (ScoreDoc) queue.pop();
return shardDocs;
}
/**
* Builds an array, with potential null elements, with docs to load.
*/
public void fillDocIdsToLoad(AtomicArray<ExtTIntArrayList> docsIdsToLoad, ShardDoc[] shardDocs) {
for (ShardDoc shardDoc : shardDocs) {
ExtTIntArrayList list = docsIdsToLoad.get(shardDoc.shardRequestId());
public void fillDocIdsToLoad(AtomicArray<ExtTIntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
for (ScoreDoc shardDoc : shardDocs) {
ExtTIntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
if (list == null) {
list = new ExtTIntArrayList(); // can't be shared!, uses unsafe on it later on
docsIdsToLoad.set(shardDoc.shardRequestId(), list);
docsIdsToLoad.set(shardDoc.shardIndex, list);
}
list.add(shardDoc.docId());
list.add(shardDoc.doc);
}
}
public InternalSearchResponse merge(ShardDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResults, AtomicArray<? extends FetchSearchResultProvider> fetchResults) {
public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResults, AtomicArray<? extends FetchSearchResultProvider> fetchResults) {
boolean sorted = false;
int sortScoreIndex = -1;
@ -363,8 +365,8 @@ public class SearchPhaseController extends AbstractComponent {
// merge hits
List<InternalSearchHit> hits = new ArrayList<InternalSearchHit>();
if (!fetchResults.asList().isEmpty()) {
for (ShardDoc shardDoc : sortedDocs) {
FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardRequestId());
for (ScoreDoc shardDoc : sortedDocs) {
FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardIndex);
if (fetchResultProvider == null) {
continue;
}
@ -372,7 +374,7 @@ public class SearchPhaseController extends AbstractComponent {
int index = fetchResult.counterGetAndIncrement();
if (index < fetchResult.hits().internalHits().length) {
InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
searchHit.score(shardDoc.score());
searchHit.score(shardDoc.score);
searchHit.shard(fetchResult.shardTarget());
if (sorted) {

View File

@ -1,34 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.controller;
import org.elasticsearch.search.SearchShardTarget;
/**
*
*/
public interface ShardDoc {
int shardRequestId();
int docId();
float score();
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.controller;
import org.apache.lucene.search.FieldDoc;
import org.elasticsearch.search.SearchShardTarget;
/**
*
*/
public class ShardFieldDoc extends FieldDoc implements ShardDoc {
private final int shardRequestId;
public ShardFieldDoc(int shardRequestId, int doc, float score) {
super(doc, score);
this.shardRequestId = shardRequestId;
}
public ShardFieldDoc(int shardRequestId, int doc, float score, Object[] fields) {
super(doc, score, fields);
this.shardRequestId = shardRequestId;
}
@Override
public int shardRequestId() {
return this.shardRequestId;
}
@Override
public int docId() {
return this.doc;
}
@Override
public float score() {
return score;
}
}

View File

@ -20,11 +20,11 @@
package org.elasticsearch.search.controller;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.search.controller.ShardFieldDoc;
import java.io.IOException;
@ -32,7 +32,7 @@ import java.io.IOException;
*
*/
// LUCENE TRACK, Had to copy over in order ot improve same order tie break to take shards into account
public class ShardFieldDocSortedHitQueue extends PriorityQueue<ShardFieldDoc> {
public class ShardFieldDocSortedHitQueue extends PriorityQueue<FieldDoc> {
volatile SortField[] fields = null;
@ -93,7 +93,7 @@ public class ShardFieldDocSortedHitQueue extends PriorityQueue<ShardFieldDoc> {
*/
@SuppressWarnings("unchecked")
@Override
protected final boolean lessThan(final ShardFieldDoc docA, final ShardFieldDoc docB) {
protected final boolean lessThan(final FieldDoc docA, final FieldDoc docB) {
final int n = fields.length;
int c = 0;
for (int i = 0; i < n && c == 0; ++i) {
@ -126,7 +126,7 @@ public class ShardFieldDocSortedHitQueue extends PriorityQueue<ShardFieldDoc> {
// avoid random sort order that could lead to duplicates (bug #31241):
if (c == 0) {
// CHANGE: Add shard base tie breaking
c = docA.shardRequestId() - docB.shardRequestId();
c = docA.shardIndex - docB.shardIndex;
if (c == 0) {
return docA.doc > docB.doc;
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.controller;
import org.apache.lucene.search.ScoreDoc;
/**
*
*/
public class ShardScoreDoc extends ScoreDoc implements ShardDoc {
private final int shardRequestId;
public ShardScoreDoc(int shardRequestId, int doc, float score) {
super(doc, score);
this.shardRequestId = shardRequestId;
}
@Override
public int shardRequestId() {
return this.shardRequestId;
}
@Override
public int docId() {
return doc;
}
@Override
public float score() {
return score;
}
}