Merge pull request #11434 from brwe/open-search-contexts

search: release search contexts after failed dfs or query phase for d…
This commit is contained in:
Britta Weber 2015-06-01 13:15:51 +02:00
commit d453699643
5 changed files with 126 additions and 12 deletions

View File

@ -91,7 +91,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
} }
} }
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) { void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() { searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
@Override @Override
public void onResponse(QueryFetchSearchResult result) { public void onResponse(QueryFetchSearchResult result) {
@ -104,7 +104,14 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); try {
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected execution)
// and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
}
} }
}); });
} }

View File

@ -100,7 +100,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
} }
} }
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) { void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() { searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
@Override @Override
public void onResponse(QuerySearchResult result) { public void onResponse(QuerySearchResult result) {
@ -113,7 +113,14 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); try {
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected execution)
// and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
}
} }
}); });
} }
@ -176,6 +183,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
} }
}); });

View File

@ -35,8 +35,8 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.QuerySearchResultProvider;
@ -118,7 +118,10 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
// the failure might happen without managing to clear the search context..., potentially need to clear its context (for example) // the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
docIdsToLoad.set(shardIndex, null); docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
} }

View File

@ -303,9 +303,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) { for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try { try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
if (node != null) { // should not happen (==null) but safeguard anyhow sendReleaseSearchContext(entry.value.id(), node);
searchService.sendFreeContext(node, entry.value.id(), request);
}
} catch (Throwable t1) { } catch (Throwable t1) {
logger.trace("failed to release context", t1); logger.trace("failed to release context", t1);
} }
@ -329,9 +327,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try { try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
if (node != null) { // should not happen (==null) but safeguard anyhow sendReleaseSearchContext(entry.value.queryResult().id(), node);
searchService.sendFreeContext(node, entry.value.queryResult().id(), request);
}
} catch (Throwable t1) { } catch (Throwable t1) {
logger.trace("failed to release context", t1); logger.trace("failed to release context", t1);
} }
@ -340,6 +336,12 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
} }
} }
protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {
if (node != null) {
searchService.sendFreeContext(node, contextId, request);
}
}
protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry, ScoreDoc[] lastEmittedDocPerShard) { protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry, ScoreDoc[] lastEmittedDocPerShard) {
if (lastEmittedDocPerShard != null) { if (lastEmittedDocPerShard != null) {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
@LuceneTestCase.Slow
public class SearchWithRejectionsTests extends ElasticsearchIntegrationTest {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder().put(super.nodeSettings(nodeOrdinal))
.put("threadpool.search.type", "fixed")
.put("threadpool.search.size", 1)
.put("threadpool.search.queue_size", 1)
.build();
}
@Test
public void testOpenContextsAfterRejections() throws InterruptedException {
createIndex("test");
ensureGreen("test");
final int docs = scaledRandomIntBetween(20, 50);
for (int i = 0; i < docs; i++) {
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value").execute().actionGet();
}
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats().execute().actionGet();
assertThat(indicesStats.getTotal().getSearch().getOpenContexts(), equalTo(0l));
refresh();
int numSearches = 10;
Future<SearchResponse>[] responses = new Future[numSearches];
SearchType searchType = randomFrom(SearchType.DEFAULT, SearchType.QUERY_AND_FETCH, SearchType.QUERY_THEN_FETCH, SearchType.DFS_QUERY_AND_FETCH, SearchType.DFS_QUERY_THEN_FETCH);
logger.info("search type is {}", searchType);
for (int i = 0; i < numSearches; i++) {
responses[i] = client().prepareSearch()
.setQuery(matchAllQuery())
.setSearchType(searchType)
.execute();
}
for (int i = 0; i < numSearches; i++) {
try {
responses[i].get();
} catch (Throwable t) {
}
}
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
// we must wait here because the requests to release search contexts might still be in flight
// although the search request has already returned
return client().admin().indices().prepareStats().execute().actionGet().getTotal().getSearch().getOpenContexts() == 0;
}
}, 1, TimeUnit.SECONDS);
indicesStats = client().admin().indices().prepareStats().execute().actionGet();
assertThat(indicesStats.getTotal().getSearch().getOpenContexts(), equalTo(0l));
}
}