Core: surgically removed slow scroll, because master (2.0) requires full cluster restart coming from previous versions.
This commit is contained in:
parent
8b5bc2643e
commit
7ac713aedc
|
@ -122,11 +122,10 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
|
|||
}
|
||||
|
||||
private void finishHim() {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
|
||||
@Override
|
||||
public void doRun() throws IOException {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults);
|
||||
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
|
|
|
@ -143,7 +143,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
|
|||
}
|
||||
|
||||
void innerExecuteFetchPhase() throws Exception {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
boolean useScroll = request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
|
||||
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
|
||||
|
||||
|
|
|
@ -46,8 +46,8 @@ import java.util.Map;
|
|||
*/
|
||||
public abstract class TransportSearchHelper {
|
||||
|
||||
public static ShardSearchTransportRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis, boolean useSlowScroll) {
|
||||
return new ShardSearchTransportRequest(request, shardRouting, numberOfShards, useSlowScroll, filteringAliases, nowInMillis);
|
||||
public static ShardSearchTransportRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) {
|
||||
return new ShardSearchTransportRequest(request, shardRouting, numberOfShards, filteringAliases, nowInMillis);
|
||||
}
|
||||
|
||||
public static InternalScrollSearchRequest internalScrollSearchRequest(long id, SearchScrollRequest request) {
|
||||
|
|
|
@ -78,7 +78,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
|
|||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
|
||||
@Override
|
||||
public void doRun() throws IOException {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
boolean useScroll = request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
|
||||
String scrollId = null;
|
||||
|
|
|
@ -85,7 +85,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
|
|||
|
||||
@Override
|
||||
protected void moveToSecondPhase() throws Exception {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
boolean useScroll = request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
|
||||
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
|
||||
|
||||
|
|
|
@ -67,12 +67,8 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
|
|||
private class AsyncAction extends AbstractAsyncAction {
|
||||
|
||||
private final SearchScrollRequest request;
|
||||
private volatile boolean useSlowScroll;
|
||||
|
||||
private final ActionListener<SearchResponse> listener;
|
||||
|
||||
private final ParsedScrollId scrollId;
|
||||
|
||||
private final DiscoveryNodes nodes;
|
||||
|
||||
private volatile AtomicArray<ShardSearchFailure> shardFailures;
|
||||
|
@ -192,7 +188,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
|
|||
}
|
||||
|
||||
private void innerFinishHim() throws Exception {
|
||||
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(!useSlowScroll, queryFetchResults);
|
||||
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
|
|
|
@ -85,8 +85,6 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
|
|||
|
||||
private final AtomicInteger successfulOps;
|
||||
|
||||
private volatile boolean useSlowScroll;
|
||||
|
||||
private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
@ -191,7 +189,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
|
|||
}
|
||||
|
||||
private void executeFetchPhase() throws Exception {
|
||||
sortedShardList = searchPhaseController.sortDocs(!useSlowScroll, queryResults);
|
||||
sortedShardList = searchPhaseController.sortDocs(true, queryResults);
|
||||
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
|
||||
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
|
||||
|
||||
|
@ -201,12 +199,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
|
|||
}
|
||||
|
||||
|
||||
final ScoreDoc[] lastEmittedDocPerShard;
|
||||
if (useSlowScroll) {
|
||||
lastEmittedDocPerShard = new ScoreDoc[queryResults.length()];
|
||||
} else {
|
||||
lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
|
||||
}
|
||||
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
|
||||
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
|
||||
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
|
||||
IntArrayList docIds = entry.value;
|
||||
|
|
|
@ -99,8 +99,6 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
|||
private final Object shardFailuresMutex = new Object();
|
||||
protected volatile ScoreDoc[] sortedShardList;
|
||||
|
||||
protected final boolean useSlowScroll;
|
||||
|
||||
protected BaseAsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
@ -124,7 +122,6 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
|||
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
|
||||
|
||||
firstResults = new AtomicArray<>(shardsIts.size());
|
||||
this.useSlowScroll = false;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
@ -157,7 +154,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
|||
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
||||
} else {
|
||||
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
|
||||
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime(), useSlowScroll), new SearchServiceListener<FirstResult>() {
|
||||
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new SearchServiceListener<FirstResult>() {
|
||||
@Override
|
||||
public void onResult(FirstResult result) {
|
||||
onFirstPhaseResult(shardIndex, shard, result, shardIt);
|
||||
|
|
|
@ -39,7 +39,7 @@ class ShardDfsOnlyRequest extends BroadcastShardOperationRequest {
|
|||
|
||||
ShardDfsOnlyRequest(ShardRouting shardRouting, int numberOfShards, @Nullable String[] filteringAliases, long nowInMillis, DfsOnlyRequest request) {
|
||||
super(shardRouting.shardId(), request);
|
||||
this.shardSearchRequest = new ShardSearchTransportRequest(request.getSearchRequest(), shardRouting, numberOfShards, false,
|
||||
this.shardSearchRequest = new ShardSearchTransportRequest(request.getSearchRequest(), shardRouting, numberOfShards,
|
||||
filteringAliases, nowInMillis);
|
||||
}
|
||||
|
||||
|
|
|
@ -668,16 +668,6 @@ public class PercolateContext extends SearchContext {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useSlowScroll() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext useSlowScroll(boolean useSlowScroll) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Counter timeEstimateCounter() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
|
@ -540,7 +540,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
SearchContext.setCurrent(context);
|
||||
try {
|
||||
context.scroll(request.scroll());
|
||||
context.useSlowScroll(request.useSlowScroll());
|
||||
|
||||
parseTemplate(request);
|
||||
parseSource(context, request.source());
|
||||
|
@ -722,7 +721,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
* handles this as well since the result is always size * shards for Q_A_F
|
||||
*/
|
||||
private void shortcutDocIdsToLoad(SearchContext context) {
|
||||
if (!context.useSlowScroll() && context.request().scroll() != null) {
|
||||
if (context.request().scroll() != null) {
|
||||
TopDocs topDocs = context.queryResult().topDocs();
|
||||
int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
|
||||
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
|
||||
|
|
|
@ -138,10 +138,11 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param scrollSort Whether to ignore the from and sort all hits in each shard result. Only used for scroll search
|
||||
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
|
||||
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
|
||||
* @param resultsArr Shard result holder
|
||||
*/
|
||||
public ScoreDoc[] sortDocs(boolean scrollSort, AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
|
||||
public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
|
||||
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
|
||||
if (results.isEmpty()) {
|
||||
return EMPTY_DOCS;
|
||||
|
@ -171,7 +172,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
}
|
||||
if (canOptimize) {
|
||||
int offset = result.from();
|
||||
if (scrollSort) {
|
||||
if (ignoreFrom) {
|
||||
offset = 0;
|
||||
}
|
||||
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
|
||||
|
@ -220,7 +221,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
shardTopDocs[sortedResult.index] = topDocs;
|
||||
}
|
||||
int from = firstResult.queryResult().from();
|
||||
if (scrollSort) {
|
||||
if (ignoreFrom) {
|
||||
from = 0;
|
||||
}
|
||||
// TopDocs#merge can't deal with null shard TopDocs
|
||||
|
|
|
@ -175,8 +175,6 @@ public class DefaultSearchContext extends SearchContext {
|
|||
|
||||
private volatile long lastAccessTime = -1;
|
||||
|
||||
private volatile boolean useSlowScroll;
|
||||
|
||||
private InnerHitsContext innerHitsContext;
|
||||
|
||||
public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
|
||||
|
@ -689,16 +687,6 @@ public class DefaultSearchContext extends SearchContext {
|
|||
return mapperService().smartNameObjectMapper(name, request.types());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useSlowScroll() {
|
||||
return useSlowScroll;
|
||||
}
|
||||
|
||||
public DefaultSearchContext useSlowScroll(boolean useSlowScroll) {
|
||||
this.useSlowScroll = useSlowScroll;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Counter timeEstimateCounter() {
|
||||
return timeEstimateCounter;
|
||||
|
|
|
@ -553,16 +553,6 @@ public abstract class FilteredSearchContext extends SearchContext {
|
|||
return in.smartNameObjectMapper(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useSlowScroll() {
|
||||
return in.useSlowScroll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext useSlowScroll(boolean useSlowScroll) {
|
||||
return in.useSlowScroll(useSlowScroll);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Counter timeEstimateCounter() {
|
||||
return in.timeEstimateCounter();
|
||||
|
|
|
@ -351,10 +351,6 @@ public abstract class SearchContext implements Releasable {
|
|||
|
||||
public abstract MapperService.SmartNameObjectMapper smartNameObjectMapper(String name);
|
||||
|
||||
public abstract boolean useSlowScroll();
|
||||
|
||||
public abstract SearchContext useSlowScroll(boolean useSlowScroll);
|
||||
|
||||
public abstract Counter timeEstimateCounter();
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,10 +19,8 @@
|
|||
|
||||
package org.elasticsearch.search.internal;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.search.type.ParsedScrollId;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -85,13 +83,11 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
|
||||
private long nowInMillis;
|
||||
|
||||
private boolean useSlowScroll;
|
||||
|
||||
ShardSearchLocalRequest() {
|
||||
}
|
||||
|
||||
ShardSearchLocalRequest(SearchRequest searchRequest, ShardRouting shardRouting, int numberOfShards,
|
||||
boolean useSlowScroll, String[] filteringAliases, long nowInMillis) {
|
||||
String[] filteringAliases, long nowInMillis) {
|
||||
this(shardRouting.shardId(), numberOfShards, searchRequest.searchType(),
|
||||
searchRequest.source(), searchRequest.types(), searchRequest.queryCache());
|
||||
|
||||
|
@ -101,7 +97,6 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
this.templateType = searchRequest.templateType();
|
||||
this.templateParams = searchRequest.templateParams();
|
||||
this.scroll = searchRequest.scroll();
|
||||
this.useSlowScroll = useSlowScroll;
|
||||
this.filteringAliases = filteringAliases;
|
||||
this.nowInMillis = nowInMillis;
|
||||
}
|
||||
|
@ -207,11 +202,6 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
return scroll;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useSlowScroll() {
|
||||
return useSlowScroll;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void innerReadFrom(StreamInput in) throws IOException {
|
||||
index = in.readString();
|
||||
|
@ -235,7 +225,6 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
if (in.readBoolean()) {
|
||||
templateParams = (Map<String, Object>) in.readGenericValue();
|
||||
}
|
||||
useSlowScroll = in.readBoolean();
|
||||
queryCache = in.readOptionalBoolean();
|
||||
}
|
||||
|
||||
|
@ -268,7 +257,6 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
if (existTemplateParams) {
|
||||
out.writeGenericValue(templateParams);
|
||||
}
|
||||
out.writeBoolean(useSlowScroll);
|
||||
out.writeOptionalBoolean(queryCache);
|
||||
}
|
||||
|
||||
|
|
|
@ -66,15 +66,6 @@ public interface ShardSearchRequest {
|
|||
|
||||
Scroll scroll();
|
||||
|
||||
/**
|
||||
* This setting is internal and will be enabled when at least one node is on versions 1.0.x and 1.1.x to enable
|
||||
* scrolling that those versions support.
|
||||
*
|
||||
* @return Whether the scrolling should use regular search and incrementing the from on each round, which can
|
||||
* bring down nodes due to the big priority queues being generated to accommodate from + size hits for sorting.
|
||||
*/
|
||||
boolean useSlowScroll();
|
||||
|
||||
/**
|
||||
* Returns the cache key for this shard search request, based on its content
|
||||
*/
|
||||
|
|
|
@ -50,10 +50,9 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
}
|
||||
|
||||
public ShardSearchTransportRequest(SearchRequest searchRequest, ShardRouting shardRouting, int numberOfShards,
|
||||
boolean useSlowScroll, String[] filteringAliases, long nowInMillis) {
|
||||
String[] filteringAliases, long nowInMillis) {
|
||||
super(searchRequest);
|
||||
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardRouting, numberOfShards,
|
||||
useSlowScroll, filteringAliases, nowInMillis);
|
||||
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardRouting, numberOfShards, filteringAliases, nowInMillis);
|
||||
this.originalIndices = new OriginalIndices(searchRequest);
|
||||
}
|
||||
|
||||
|
@ -153,11 +152,6 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
return shardSearchLocalRequest.scroll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useSlowScroll() {
|
||||
return shardSearchLocalRequest.useSlowScroll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
|
|
@ -345,11 +345,6 @@ public class SubSearchContext extends FilteredSearchContext {
|
|||
return searchLookup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext useSlowScroll(boolean useSlowScroll) {
|
||||
throw new UnsupportedOperationException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Counter timeEstimateCounter() {
|
||||
throw new UnsupportedOperationException("Not supported");
|
||||
|
|
|
@ -115,7 +115,7 @@ public class QueryPhase implements SearchPhase {
|
|||
topDocs = searchContext.scanContext().execute(searchContext);
|
||||
} else {
|
||||
// Perhaps have a dedicated scroll phase?
|
||||
if (!searchContext.useSlowScroll() && searchContext.request().scroll() != null) {
|
||||
if (searchContext.request().scroll() != null) {
|
||||
numDocs = searchContext.size();
|
||||
ScoreDoc lastEmittedDoc = searchContext.lastEmittedDoc();
|
||||
if (searchContext.sort() != null) {
|
||||
|
|
|
@ -579,16 +579,6 @@ public class TestSearchContext extends SearchContext {
|
|||
public void doClose() throws ElasticsearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useSlowScroll() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext useSlowScroll(boolean useSlowScroll) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Counter timeEstimateCounter() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
Loading…
Reference in New Issue