Fix concurrent requests race over scroll context limit (#53449)
Concurrent search scroll requests can lead to more scroll contexts than the limit.
This commit is contained in:
parent
2789fe4179
commit
fe2f6b359e
|
@ -642,7 +642,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
boolean success = false;
|
||||
try {
|
||||
if (context.scrollContext() != null) {
|
||||
openScrollContexts.incrementAndGet();
|
||||
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
|
||||
}
|
||||
context.indexShard().getSearchOperationListener().onNewContext(context);
|
||||
|
@ -661,14 +660,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
final SearchContext createContext(SearchRewriteContext rewriteContext) throws IOException {
|
||||
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
|
||||
try {
|
||||
if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) {
|
||||
throw new ElasticsearchException(
|
||||
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
|
||||
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
|
||||
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
|
||||
}
|
||||
final ShardSearchRequest request = rewriteContext.request;
|
||||
if (request.scroll() != null) {
|
||||
context.addReleasable(openScrollContexts::decrementAndGet, Lifetime.CONTEXT);
|
||||
if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) {
|
||||
throw new ElasticsearchException(
|
||||
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
|
||||
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
|
||||
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
|
||||
}
|
||||
context.scrollContext(new ScrollContext());
|
||||
context.scrollContext().scroll = request.scroll();
|
||||
}
|
||||
|
@ -768,7 +768,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
assert activeContexts.containsKey(context.id().getId()) == false;
|
||||
context.indexShard().getSearchOperationListener().onFreeContext(context);
|
||||
if (context.scrollContext() != null) {
|
||||
openScrollContexts.decrementAndGet();
|
||||
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -539,6 +539,47 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
ex.getMessage());
|
||||
}
|
||||
|
||||
public void testOpenScrollContextsConcurrently() throws Exception {
|
||||
createIndex("index");
|
||||
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
final IndexShard indexShard = indicesService.indexServiceSafe(resolveIndex("index")).getShard(0);
|
||||
|
||||
final int maxScrollContexts = SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY);
|
||||
final SearchService searchService = getInstanceFromNode(SearchService.class);
|
||||
Thread[] threads = new Thread[randomIntBetween(2, 8)];
|
||||
CountDownLatch latch = new CountDownLatch(threads.length);
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i] = new Thread(() -> {
|
||||
latch.countDown();
|
||||
try {
|
||||
latch.await();
|
||||
for (; ; ) {
|
||||
SearchService.SearchRewriteContext rewriteContext =
|
||||
searchService.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard);
|
||||
try {
|
||||
searchService.createAndPutContext(rewriteContext);
|
||||
} catch (ElasticsearchException e) {
|
||||
assertThat(e.getMessage(), equalTo(
|
||||
"Trying to create too many scroll contexts. Must be less than or equal to: " +
|
||||
"[" + maxScrollContexts + "]. " +
|
||||
"This limit can be set by changing the [search.max_open_scroll_context] setting."));
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
threads[i].setName("elasticsearch[node_s_0][search]");
|
||||
threads[i].start();
|
||||
}
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
assertThat(searchService.getActiveContexts(), equalTo(maxScrollContexts));
|
||||
searchService.freeAllScrollContexts();
|
||||
}
|
||||
|
||||
public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
|
||||
@Override
|
||||
public List<QuerySpec<?>> getQueries() {
|
||||
|
|
Loading…
Reference in New Issue