diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index ee92c694209..a122cad2af1 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -141,6 +141,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } catch (Throwable t) { + docIdsToLoad.set(entry.index, null); // clear it, we didn't manage to do anything with it onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); } } @@ -162,6 +163,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi @Override public void onFailure(Throwable t) { + // the failure might happen without managing to clear the search context..., potentially need to clear its context (for example) + docIdsToLoad.set(shardIndex, null); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); } }); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index d29d82207f3..3c9aa834fff 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.search.type; import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.search.*; @@ -155,7 +156,7 @@ public abstract class TransportSearchTypeAction extends TransportAction expectedTotalOps) { + raiseEarlyFailure(new ElasticsearchIllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]")); } } @@ -288,12 +288,12 @@ public abstract class TransportSearchTypeAction extends TransportAction entry : firstResults.asList()) { + try { + DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); + if (node != null) { // should not happen (==null) but safeguard anyhow + searchService.sendFreeContext(node, entry.value.id(), request); + } + } catch (Throwable t1) { + logger.trace("failed to release context", t1); + } + } + listener.onFailure(t); + } + /** * Releases shard targets that are not used in the docsIdsToLoad. */ @@ -391,9 +405,13 @@ public abstract class TransportSearchTypeAction extends TransportAction entry : queryResults.asList()) { if (docIdsToLoad.get(entry.index) == null) { - DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); - if (node != null) { // should not happen (==null) but safeguard anyhow - searchService.sendFreeContext(node, entry.value.queryResult().id(), request); + try { + DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); + if (node != null) { // should not happen (==null) but safeguard anyhow + searchService.sendFreeContext(node, entry.value.queryResult().id(), request); + } + } catch (Throwable t1) { + logger.trace("failed to release context", t1); } } } diff --git a/src/test/java/org/elasticsearch/action/RejectionActionTests.java b/src/test/java/org/elasticsearch/action/RejectionActionTests.java new file mode 100644 index 00000000000..313339b1515 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/RejectionActionTests.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import com.google.common.collect.Lists; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.junit.Test; + +import java.util.Locale; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.equalTo; + +/** + */ +@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes = 2) +public class RejectionActionTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + .put("threadpool.search.size", 1) + .put("threadpool.search.queue_size", 1) + .put("threadpool.index.size", 1) + .put("threadpool.index.queue_size", 1) + .put("threadpool.get.size", 1) + .put("threadpool.get.queue_size", 1) + .build(); + } + + + @Test + public void simulateSearchRejectionLoad() throws Throwable { + for (int i = 0; i < 10; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "1").get(); + } + + int numberOfAsyncOps = randomIntBetween(200, 700); + final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps); + final CopyOnWriteArrayList responses = Lists.newCopyOnWriteArrayList(); + for (int i = 0; i < numberOfAsyncOps; i++) { + client().prepareSearch("test") + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setQuery(QueryBuilders.matchQuery("field", "1")) + .execute(new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + responses.add(searchResponse); + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + responses.add(e); + latch.countDown(); + } + }); + } + latch.await(); + assertThat(responses.size(), equalTo(numberOfAsyncOps)); + + // validate all responses + for (Object response : responses) { + if (response instanceof SearchResponse) { + SearchResponse searchResponse = (SearchResponse) response; + for (ShardSearchFailure failure : searchResponse.getShardFailures()) { + assertTrue("got unexpected reason..." + failure.reason(), failure.reason().toLowerCase(Locale.ENGLISH).contains("rejected")); + } + } else { + Throwable t = (Throwable) response; + Throwable unwrap = ExceptionsHelper.unwrapCause(t); + if (unwrap instanceof SearchPhaseExecutionException) { + SearchPhaseExecutionException e = (SearchPhaseExecutionException) unwrap; + for (ShardSearchFailure failure : e.shardFailures()) { + assertTrue("got unexpected reason..." + failure.reason(), failure.reason().toLowerCase(Locale.ENGLISH).contains("rejected")); + } + } else { + throw new AssertionError("unexpected failure", (Throwable) response); + } + } + } + } +}