diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index 93042815e00..0be39abba14 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -41,7 +41,9 @@ import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; @@ -159,7 +161,9 @@ public class TransportSearchScrollScanAction extends AbstractComponent { searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new ActionListener() { @Override public void onResponse(ScrollQueryFetchSearchResult result) { - queryFetchResults.set(shardIndex, result.result()); + QueryFetchSearchResult shardResult = result.result(); + Objects.requireNonNull(shardResult, "QueryFetchSearchResult can't be null"); + queryFetchResults.setOnce(shardIndex, shardResult); if (counter.decrementAndGet() == 0) { finishHim(); } @@ -197,25 +201,27 @@ public class TransportSearchScrollScanAction extends AbstractComponent { private void innerFinishHim() throws IOException { int numberOfHits = 0; - for (AtomicArray.Entry entry : queryFetchResults.asList()) { + List> entries = queryFetchResults.asList(); + for (AtomicArray.Entry entry : entries) { numberOfHits += entry.value.queryResult().topDocs().scoreDocs.length; } - ScoreDoc[] docs = new ScoreDoc[numberOfHits]; - int counter = 0; - for (AtomicArray.Entry entry : queryFetchResults.asList()) { + List docs = new ArrayList<>(numberOfHits); + for (AtomicArray.Entry entry : entries) { ScoreDoc[] scoreDocs = entry.value.queryResult().topDocs().scoreDocs; for (ScoreDoc scoreDoc : scoreDocs) { scoreDoc.shardIndex = entry.index; - docs[counter++] = scoreDoc; + docs.add(scoreDoc); } } - final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(docs.toArray(new ScoreDoc[0]), queryFetchResults, queryFetchResults); ((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.getAttributes().get("total_hits")); - for (AtomicArray.Entry entry : queryFetchResults.asList()) { + for (AtomicArray.Entry entry : entries) { if (entry.value.queryResult().topDocs().scoreDocs.length < entry.value.queryResult().size()) { - // we found more than we want for this round, remove this from our scrolling + // we found more than we want for this round, remove this from our scrolling, so we don't go back to + // this shard, since all hits have been processed. + // The SearchContext already gets freed on the node holding the shard, via a similar check. queryFetchResults.set(entry.index, null); } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java index 93644d0d80f..38953c51b02 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java @@ -67,6 +67,15 @@ public class AtomicArray { } } + public final void setOnce(int i, E value) { + if (array.compareAndSet(i, null, value) == false) { + throw new IllegalStateException("index [" + i + "] has already been set"); + } + if (nonNullList != null) { // read first, lighter, and most times it will be null... + nonNullList = null; + } + } + /** * Gets the current value at position {@code i}. *