diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 83e9aba54f0..01eefdb564d 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -91,7 +91,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc } } - void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) { + void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { @Override public void onResponse(QueryFetchSearchResult result) { @@ -104,7 +104,14 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc @Override public void onFailure(Throwable t) { - onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + try { + onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + } finally { + // the query might not have been executed at all (for example because thread pool rejected execution) + // and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), node); + } } }); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index c1a361903e8..3a4e6d70533 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -100,7 +100,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } } - void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) { + void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { @Override public void onResponse(QuerySearchResult result) { @@ -113,7 +113,14 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA @Override public void onFailure(Throwable t) { - onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + try { + onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + } finally { + // the query might not have been executed at all (for example because thread pool rejected execution) + // and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), node); + } } }); } @@ -176,6 +183,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA @Override public void onFailure(Throwable t) { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared + // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. + docIdsToLoad.set(shardIndex, null); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); } }); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 175a770e9c6..79a164c12ef 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -35,8 +35,8 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResultProvider; @@ -118,7 +118,10 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi @Override public void onFailure(Throwable t) { - // the failure might happen without managing to clear the search context..., potentially need to clear its context (for example) + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared + // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. docIdsToLoad.set(shardIndex, null); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index e2cf4d87f53..4c1210920d7 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -303,9 +303,7 @@ public abstract class TransportSearchTypeAction extends TransportAction entry : firstResults.asList()) { try { DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); - if (node != null) { // should not happen (==null) but safeguard anyhow - searchService.sendFreeContext(node, entry.value.id(), request); - } + sendReleaseSearchContext(entry.value.id(), node); } catch (Throwable t1) { logger.trace("failed to release context", t1); } @@ -329,9 +327,7 @@ public abstract class TransportSearchTypeAction extends TransportAction entry, ScoreDoc[] lastEmittedDocPerShard) { if (lastEmittedDocPerShard != null) { ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; diff --git a/src/test/java/org/elasticsearch/search/SearchWithRejectionsTests.java b/src/test/java/org/elasticsearch/search/SearchWithRejectionsTests.java new file mode 100644 index 00000000000..322679a584e --- /dev/null +++ b/src/test/java/org/elasticsearch/search/SearchWithRejectionsTests.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search; + +import com.google.common.base.Predicate; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE) +@LuceneTestCase.Slow +public class SearchWithRejectionsTests extends ElasticsearchIntegrationTest { + @Override + public Settings nodeSettings(int nodeOrdinal) { + return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) + .put("threadpool.search.type", "fixed") + .put("threadpool.search.size", 1) + .put("threadpool.search.queue_size", 1) + .build(); + } + + @Test + public void testOpenContextsAfterRejections() throws InterruptedException { + createIndex("test"); + ensureGreen("test"); + final int docs = scaledRandomIntBetween(20, 50); + for (int i = 0; i < docs; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value").execute().actionGet(); + } + IndicesStatsResponse indicesStats = client().admin().indices().prepareStats().execute().actionGet(); + assertThat(indicesStats.getTotal().getSearch().getOpenContexts(), equalTo(0l)); + refresh(); + + int numSearches = 10; + Future[] responses = new Future[numSearches]; + SearchType searchType = randomFrom(SearchType.DEFAULT, SearchType.QUERY_AND_FETCH, SearchType.QUERY_THEN_FETCH, SearchType.DFS_QUERY_AND_FETCH, SearchType.DFS_QUERY_THEN_FETCH); + logger.info("search type is {}", searchType); + for (int i = 0; i < numSearches; i++) { + responses[i] = client().prepareSearch() + .setQuery(matchAllQuery()) + .setSearchType(searchType) + .execute(); + } + for (int i = 0; i < numSearches; i++) { + try { + responses[i].get(); + } catch (Throwable t) { + } + } + awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + // we must wait here because the requests to release search contexts might still be in flight + // although the search request has already returned + return client().admin().indices().prepareStats().execute().actionGet().getTotal().getSearch().getOpenContexts() == 0; + } + }, 1, TimeUnit.SECONDS); + indicesStats = client().admin().indices().prepareStats().execute().actionGet(); + assertThat(indicesStats.getTotal().getSearch().getOpenContexts(), equalTo(0l)); + } +}