diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1220e76c703..ac1cbb2578a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -642,7 +642,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv boolean success = false; try { if (context.scrollContext() != null) { - openScrollContexts.incrementAndGet(); context.indexShard().getSearchOperationListener().onNewScrollContext(context); } context.indexShard().getSearchOperationListener().onNewContext(context); @@ -661,14 +660,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv final SearchContext createContext(SearchRewriteContext rewriteContext) throws IOException { final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout); try { - if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) { - throw new ElasticsearchException( - "Trying to create too many scroll contexts. Must be less than or equal to: [" + - maxOpenScrollContext + "]. " + "This limit can be set by changing the [" - + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); - } final ShardSearchRequest request = rewriteContext.request; if (request.scroll() != null) { + context.addReleasable(openScrollContexts::decrementAndGet, Lifetime.CONTEXT); + if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) { + throw new ElasticsearchException( + "Trying to create too many scroll contexts. Must be less than or equal to: [" + + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); + } context.scrollContext(new ScrollContext()); context.scrollContext().scroll = request.scroll(); } @@ -768,7 +768,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv assert activeContexts.containsKey(context.id().getId()) == false; context.indexShard().getSearchOperationListener().onFreeContext(context); if (context.scrollContext() != null) { - openScrollContexts.decrementAndGet(); context.indexShard().getSearchOperationListener().onFreeScrollContext(context); } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 3c4f47aedcd..3e7303c5a90 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -539,6 +539,47 @@ public class SearchServiceTests extends ESSingleNodeTestCase { ex.getMessage()); } + public void testOpenScrollContextsConcurrently() throws Exception { + createIndex("index"); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexShard indexShard = indicesService.indexServiceSafe(resolveIndex("index")).getShard(0); + + final int maxScrollContexts = SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY); + final SearchService searchService = getInstanceFromNode(SearchService.class); + Thread[] threads = new Thread[randomIntBetween(2, 8)]; + CountDownLatch latch = new CountDownLatch(threads.length); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + latch.countDown(); + try { + latch.await(); + for (; ; ) { + SearchService.SearchRewriteContext rewriteContext = + searchService.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard); + try { + searchService.createAndPutContext(rewriteContext); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), equalTo( + "Trying to create too many scroll contexts. Must be less than or equal to: " + + "[" + maxScrollContexts + "]. " + + "This limit can be set by changing the [search.max_open_scroll_context] setting.")); + return; + } + } + } catch (Exception e) { + throw new AssertionError(e); + } + }); + threads[i].setName("elasticsearch[node_s_0][search]"); + threads[i].start(); + } + for (Thread thread : threads) { + thread.join(); + } + assertThat(searchService.getActiveContexts(), equalTo(maxScrollContexts)); + searchService.freeAllScrollContexts(); + } + public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin { @Override public List> getQueries() {