This change adds a soft limit to open scroll contexts that can be controlled with the dynamic cluster setting `search.max_open_scroll_context` (defaults to 500).
This commit is contained in:
parent
fdec331b13
commit
d27aa72b17
|
@ -125,6 +125,11 @@ TIP: Keeping older segments alive means that more file handles are needed.
|
||||||
Ensure that you have configured your nodes to have ample free file handles.
|
Ensure that you have configured your nodes to have ample free file handles.
|
||||||
See <<file-descriptors>>.
|
See <<file-descriptors>>.
|
||||||
|
|
||||||
|
NOTE: To prevent against issues caused by having too many scrolls open, the
|
||||||
|
user is not allowed to open scrolls past a certain limit. By default, the
|
||||||
|
maximum number of open scrolls is 500. This limit can be updated with the
|
||||||
|
`search.max_open_scroll_context` cluster setting.
|
||||||
|
|
||||||
You can check how many search contexts are open with the
|
You can check how many search contexts are open with the
|
||||||
<<cluster-nodes-stats,nodes stats API>>:
|
<<cluster-nodes-stats,nodes stats API>>:
|
||||||
|
|
||||||
|
|
|
@ -387,6 +387,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
SearchService.MAX_KEEPALIVE_SETTING,
|
SearchService.MAX_KEEPALIVE_SETTING,
|
||||||
MultiBucketConsumerService.MAX_BUCKET_SETTING,
|
MultiBucketConsumerService.MAX_BUCKET_SETTING,
|
||||||
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
|
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
|
||||||
|
SearchService.MAX_OPEN_SCROLL_CONTEXT,
|
||||||
Node.WRITE_PORTS_FILE_SETTING,
|
Node.WRITE_PORTS_FILE_SETTING,
|
||||||
Node.NODE_NAME_SETTING,
|
Node.NODE_NAME_SETTING,
|
||||||
Node.NODE_DATA_SETTING,
|
Node.NODE_DATA_SETTING,
|
||||||
|
|
|
@ -112,6 +112,7 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
@ -145,6 +146,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
public static final Setting<Boolean> DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS =
|
public static final Setting<Boolean> DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS =
|
||||||
Setting.boolSetting("search.default_allow_partial_results", true, Property.Dynamic, Property.NodeScope);
|
Setting.boolSetting("search.default_allow_partial_results", true, Property.Dynamic, Property.NodeScope);
|
||||||
|
|
||||||
|
public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT =
|
||||||
|
Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope);
|
||||||
|
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
@ -174,6 +178,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
|
|
||||||
private volatile boolean lowLevelCancellation;
|
private volatile boolean lowLevelCancellation;
|
||||||
|
|
||||||
|
private volatile int maxOpenScrollContext;
|
||||||
|
|
||||||
private final Cancellable keepAliveReaper;
|
private final Cancellable keepAliveReaper;
|
||||||
|
|
||||||
private final AtomicLong idGenerator = new AtomicLong();
|
private final AtomicLong idGenerator = new AtomicLong();
|
||||||
|
@ -182,6 +188,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
|
|
||||||
private final MultiBucketConsumerService multiBucketConsumerService;
|
private final MultiBucketConsumerService multiBucketConsumerService;
|
||||||
|
|
||||||
|
private final AtomicInteger openScrollContexts = new AtomicInteger();
|
||||||
|
|
||||||
public SearchService(ClusterService clusterService, IndicesService indicesService,
|
public SearchService(ClusterService clusterService, IndicesService indicesService,
|
||||||
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
|
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
|
||||||
ResponseCollectorService responseCollectorService) {
|
ResponseCollectorService responseCollectorService) {
|
||||||
|
@ -212,6 +220,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
|
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
|
||||||
this::setDefaultAllowPartialSearchResults);
|
this::setDefaultAllowPartialSearchResults);
|
||||||
|
|
||||||
|
maxOpenScrollContext = MAX_OPEN_SCROLL_CONTEXT.get(settings);
|
||||||
|
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_SCROLL_CONTEXT, this::setMaxOpenScrollContext);
|
||||||
|
|
||||||
lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
|
lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
|
||||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
|
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
|
||||||
|
@ -243,6 +253,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
return defaultAllowPartialSearchResults;
|
return defaultAllowPartialSearchResults;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setMaxOpenScrollContext(int maxOpenScrollContext) {
|
||||||
|
this.maxOpenScrollContext = maxOpenScrollContext;
|
||||||
|
}
|
||||||
|
|
||||||
private void setLowLevelCancellation(Boolean lowLevelCancellation) {
|
private void setLowLevelCancellation(Boolean lowLevelCancellation) {
|
||||||
this.lowLevelCancellation = lowLevelCancellation;
|
this.lowLevelCancellation = lowLevelCancellation;
|
||||||
}
|
}
|
||||||
|
@ -592,11 +606,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
}
|
}
|
||||||
|
|
||||||
final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException {
|
final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException {
|
||||||
|
if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) {
|
||||||
|
throw new ElasticsearchException(
|
||||||
|
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
|
||||||
|
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
|
||||||
|
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
|
||||||
|
}
|
||||||
|
|
||||||
SearchContext context = createContext(request);
|
SearchContext context = createContext(request);
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
putContext(context);
|
putContext(context);
|
||||||
if (request.scroll() != null) {
|
if (request.scroll() != null) {
|
||||||
|
openScrollContexts.incrementAndGet();
|
||||||
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
|
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
|
||||||
}
|
}
|
||||||
context.indexShard().getSearchOperationListener().onNewContext(context);
|
context.indexShard().getSearchOperationListener().onNewContext(context);
|
||||||
|
@ -696,6 +718,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
|
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
|
||||||
context.indexShard().getSearchOperationListener().onFreeContext(context);
|
context.indexShard().getSearchOperationListener().onFreeContext(context);
|
||||||
if (context.scrollContext() != null) {
|
if (context.scrollContext() != null) {
|
||||||
|
openScrollContexts.decrementAndGet();
|
||||||
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
|
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -21,12 +21,14 @@ package org.elasticsearch.search;
|
||||||
import com.carrotsearch.hppc.IntArrayList;
|
import com.carrotsearch.hppc.IntArrayList;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.search.SearchTask;
|
import org.elasticsearch.action.search.SearchTask;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
|
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
|
@ -76,6 +78,7 @@ import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -417,6 +420,44 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test that creating more than the allowed number of scroll contexts throws an exception
|
||||||
|
*/
|
||||||
|
public void testMaxOpenScrollContexts() throws RuntimeException {
|
||||||
|
createIndex("index");
|
||||||
|
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||||
|
|
||||||
|
final SearchService service = getInstanceFromNode(SearchService.class);
|
||||||
|
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
|
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
|
||||||
|
final IndexShard indexShard = indexService.getShard(0);
|
||||||
|
|
||||||
|
// Open all possible scrolls, clear some of them, then open more until the limit is reached
|
||||||
|
LinkedList<String> clearScrollIds = new LinkedList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY); i++) {
|
||||||
|
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
|
||||||
|
|
||||||
|
if (randomInt(4) == 0) clearScrollIds.addLast(searchResponse.getScrollId());
|
||||||
|
}
|
||||||
|
|
||||||
|
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
|
||||||
|
clearScrollRequest.setScrollIds(clearScrollIds);
|
||||||
|
client().clearScroll(clearScrollRequest);
|
||||||
|
|
||||||
|
for (int i = 0; i < clearScrollIds.size(); i++) {
|
||||||
|
client().prepareSearch("index").setSize(1).setScroll("1m").get();
|
||||||
|
}
|
||||||
|
|
||||||
|
ElasticsearchException ex = expectThrows(ElasticsearchException.class,
|
||||||
|
() -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId())));
|
||||||
|
assertEquals(
|
||||||
|
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
|
||||||
|
SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " +
|
||||||
|
"This limit can be set by changing the [search.max_open_scroll_context] setting.",
|
||||||
|
ex.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
|
public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
|
||||||
@Override
|
@Override
|
||||||
public List<QuerySpec<?>> getQueries() {
|
public List<QuerySpec<?>> getQueries() {
|
||||||
|
@ -472,6 +513,22 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class ShardScrollRequestTest extends ShardSearchLocalRequest {
|
||||||
|
private Scroll scroll;
|
||||||
|
|
||||||
|
ShardScrollRequestTest(ShardId shardId) {
|
||||||
|
super(shardId, 1, SearchType.DEFAULT, new SearchSourceBuilder(),
|
||||||
|
new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, true, null, null);
|
||||||
|
|
||||||
|
this.scroll = new Scroll(TimeValue.timeValueMinutes(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Scroll scroll() {
|
||||||
|
return this.scroll;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testCanMatch() throws IOException {
|
public void testCanMatch() throws IOException {
|
||||||
createIndex("index");
|
createIndex("index");
|
||||||
final SearchService service = getInstanceFromNode(SearchService.class);
|
final SearchService service = getInstanceFromNode(SearchService.class);
|
||||||
|
|
Loading…
Reference in New Issue