[SEARCH] close active contexts on SearchService#close()
When we close a node all pending / active search requests need to be cleared otherwise a node will wait up to 30 sec for shutdown sicne there could be open scroll requests. This behavior was introduces in 1.5 such that versions <= 1.4.x are not affected. Closes #8940
This commit is contained in:
parent
a63a055f63
commit
e47b753617
|
@ -139,7 +139,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
||||||
private final ImmutableMap<String, SearchParseElement> elementParsers;
|
private final ImmutableMap<String, SearchParseElement> elementParsers;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, IndicesWarmer indicesWarmer, ThreadPool threadPool,
|
public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService,IndicesWarmer indicesWarmer, ThreadPool threadPool,
|
||||||
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
|
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
|
||||||
IndicesQueryCache indicesQueryCache) {
|
IndicesQueryCache indicesQueryCache) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
@ -196,6 +196,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws ElasticsearchException {
|
protected void doClose() throws ElasticsearchException {
|
||||||
|
doStop();
|
||||||
FutureUtils.cancel(keepAliveReaper);
|
FutureUtils.cancel(keepAliveReaper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -774,6 +775,14 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of active contexts in this
|
||||||
|
* SearchService
|
||||||
|
*/
|
||||||
|
public int getActiveContexts() {
|
||||||
|
return this.activeContexts.size();
|
||||||
|
}
|
||||||
|
|
||||||
static class NormsWarmer extends IndicesWarmer.Listener {
|
static class NormsWarmer extends IndicesWarmer.Listener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.search;
|
||||||
|
|
||||||
|
|
||||||
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
|
||||||
|
public class SearchServiceTests extends ElasticsearchSingleNodeTest {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean resetNodeAfterTest() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testClearOnClose() throws ExecutionException, InterruptedException {
|
||||||
|
createIndex("index");
|
||||||
|
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefresh(true).get();
|
||||||
|
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
|
||||||
|
assertThat(searchResponse.getScrollId(), is(notNullValue()));
|
||||||
|
SearchService service = getInstanceFromNode(SearchService.class);
|
||||||
|
|
||||||
|
assertEquals(1, service.getActiveContexts());
|
||||||
|
service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test
|
||||||
|
assertEquals(0, service.getActiveContexts());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testClearOnStop() throws ExecutionException, InterruptedException {
|
||||||
|
createIndex("index");
|
||||||
|
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefresh(true).get();
|
||||||
|
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
|
||||||
|
assertThat(searchResponse.getScrollId(), is(notNullValue()));
|
||||||
|
SearchService service = getInstanceFromNode(SearchService.class);
|
||||||
|
|
||||||
|
assertEquals(1, service.getActiveContexts());
|
||||||
|
service.doStop();
|
||||||
|
assertEquals(0, service.getActiveContexts());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testClearIndexDelete() throws ExecutionException, InterruptedException {
|
||||||
|
createIndex("index");
|
||||||
|
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefresh(true).get();
|
||||||
|
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
|
||||||
|
assertThat(searchResponse.getScrollId(), is(notNullValue()));
|
||||||
|
SearchService service = getInstanceFromNode(SearchService.class);
|
||||||
|
|
||||||
|
assertEquals(1, service.getActiveContexts());
|
||||||
|
assertAcked(client().admin().indices().prepareDelete("index"));
|
||||||
|
assertEquals(0, service.getActiveContexts());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue