From e47b753617302d0860d048fed0b6cd0ab2f6dcbb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 15 Dec 2014 09:41:31 +0100 Subject: [PATCH] [SEARCH] close active contexts on SearchService#close() When we close a node all pending / active search requests need to be cleared otherwise a node will wait up to 30 sec for shutdown sicne there could be open scroll requests. This behavior was introduces in 1.5 such that versions <= 1.4.x are not affected. Closes #8940 --- .../elasticsearch/search/SearchService.java | 11 ++- .../search/SearchServiceTests.java | 73 +++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 src/test/java/org/elasticsearch/search/SearchServiceTests.java diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index 9da011a2d7e..40865b7dbe8 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -139,7 +139,7 @@ public class SearchService extends AbstractLifecycleComponent { private final ImmutableMap elementParsers; @Inject - public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, IndicesWarmer indicesWarmer, ThreadPool threadPool, + public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService,IndicesWarmer indicesWarmer, ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesQueryCache indicesQueryCache) { super(settings); @@ -196,6 +196,7 @@ public class SearchService extends AbstractLifecycleComponent { @Override protected void doClose() throws ElasticsearchException { + doStop(); FutureUtils.cancel(keepAliveReaper); } @@ -774,6 +775,14 @@ public class SearchService extends AbstractLifecycleComponent { } } + /** + * Returns the number of active contexts in this + * SearchService + */ + public int getActiveContexts() { + return this.activeContexts.size(); + } + static class NormsWarmer extends IndicesWarmer.Listener { @Override diff --git a/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/src/test/java/org/elasticsearch/search/SearchServiceTests.java new file mode 100644 index 00000000000..2578257af7c --- /dev/null +++ b/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search; + + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.test.ElasticsearchSingleNodeTest; + +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class SearchServiceTests extends ElasticsearchSingleNodeTest { + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testClearOnClose() throws ExecutionException, InterruptedException { + createIndex("index"); + client().prepareIndex("index", "type", "1").setSource("field", "value").setRefresh(true).get(); + SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get(); + assertThat(searchResponse.getScrollId(), is(notNullValue())); + SearchService service = getInstanceFromNode(SearchService.class); + + assertEquals(1, service.getActiveContexts()); + service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + assertEquals(0, service.getActiveContexts()); + } + + public void testClearOnStop() throws ExecutionException, InterruptedException { + createIndex("index"); + client().prepareIndex("index", "type", "1").setSource("field", "value").setRefresh(true).get(); + SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get(); + assertThat(searchResponse.getScrollId(), is(notNullValue())); + SearchService service = getInstanceFromNode(SearchService.class); + + assertEquals(1, service.getActiveContexts()); + service.doStop(); + assertEquals(0, service.getActiveContexts()); + } + + public void testClearIndexDelete() throws ExecutionException, InterruptedException { + createIndex("index"); + client().prepareIndex("index", "type", "1").setSource("field", "value").setRefresh(true).get(); + SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get(); + assertThat(searchResponse.getScrollId(), is(notNullValue())); + SearchService service = getInstanceFromNode(SearchService.class); + + assertEquals(1, service.getActiveContexts()); + assertAcked(client().admin().indices().prepareDelete("index")); + assertEquals(0, service.getActiveContexts()); + } +}