Search might not return on thread pool rejection

When a thread pool rejects the execution on the local node, the search might not return.
This happens due to the fact that we move to the next shard only *within* the execution on the thread pool in the start method. If it fails to submit the task to the thread pool, it will go through the fail shard logic, but without "counting" the current shard itself. When this happens, the relevant shard will then execute more times than intended, causing the total opes counter to skew, and for example, if on another shard the search is successful, the total ops will be incremented *beyond* the expectedTotalOps, causing the check on == as the exit condition to never happen.
The fix here makes sure that the shard iterator properly progresses even in the case of rejections, and also includes improvement to when cleaning a context is sent in case of failures (which were exposed by the test).
Though the change fixes the problem, we should work on simplifying the code path considerably, the first suggestion as a followup is to remove the support for operation threading (also in broadcast), and move the local optimization execution to SearchService, this will simplify the code in different search action considerably, and will allow to remove the problematic #firstOrNull method on the shard iterator.
The second suggestion is to move the optimization of local execution to the TransportService, so all actions will not have to explicitly do the mentioned optimization.
fixes #4887
This commit is contained in:
Shay Banon 2014-05-04 03:08:50 +02:00
parent e96e634d10
commit 342a32fb16
3 changed files with 148 additions and 17 deletions

View File

@ -141,6 +141,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
} }
} catch (Throwable t) { } catch (Throwable t) {
docIdsToLoad.set(entry.index, null); // clear it, we didn't manage to do anything with it
onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter);
} }
} }
@ -162,6 +163,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
// the failure might happen without managing to clear the search context..., potentially need to clear its context (for example)
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
} }
}); });

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search.type;
import com.carrotsearch.hppc.IntArrayList; import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.*;
@ -155,7 +156,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
localOperations++; localOperations++;
} else { } else {
// do the remote operation here, the localAsync flag is not relevant // do the remote operation here, the localAsync flag is not relevant
performFirstPhase(shardIndex, shardIt); performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull());
} }
} else { } else {
// really, no shards active in this group // really, no shards active in this group
@ -175,7 +176,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
final ShardRouting shard = shardIt.firstOrNull(); final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) { if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
performFirstPhase(shardIndex, shardIt); performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull());
} }
} }
} }
@ -190,22 +191,23 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
for (final ShardIterator shardIt : shardsIts) { for (final ShardIterator shardIt : shardsIts) {
shardIndex++; shardIndex++;
final int fShardIndex = shardIndex; final int fShardIndex = shardIndex;
final ShardRouting shard = shardIt.firstOrNull(); ShardRouting first = shardIt.firstOrNull();
if (shard != null) { if (first != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (first.currentNodeId().equals(nodes.localNodeId())) {
final ShardRouting shard = shardIt.nextOrNull();
if (localAsync) { if (localAsync) {
try { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override @Override
public void run() { public void run() {
performFirstPhase(fShardIndex, shardIt); performFirstPhase(fShardIndex, shardIt, shard);
} }
}); });
} catch (Throwable t) { } catch (Throwable t) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t); onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t);
} }
} else { } else {
performFirstPhase(fShardIndex, shardIt); performFirstPhase(fShardIndex, shardIt, shard);
} }
} }
} }
@ -214,10 +216,6 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
} }
} }
void performFirstPhase(final int shardIndex, final ShardIterator shardIt) {
performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull());
}
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) { if (shard == null) {
// no more active shards... (we should not really get here, but just for safety) // no more active shards... (we should not really get here, but just for safety)
@ -260,8 +258,10 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e); logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
} }
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
} }
} else if (xTotalOps > expectedTotalOps) {
raiseEarlyFailure(new ElasticsearchIllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]"));
} }
} }
@ -288,12 +288,12 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
logger.debug("All shards failed for phase: [{}]", firstPhaseName(), t); logger.debug("All shards failed for phase: [{}]", firstPhaseName(), t);
} }
// no successful ops, raise an exception // no successful ops, raise an exception
listener.onFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", buildShardFailures())); raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", buildShardFailures()));
} else { } else {
try { try {
innerMoveToSecondPhase(); innerMoveToSecondPhase();
} catch (Throwable e) { } catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
} }
} }
} else { } else {
@ -379,6 +379,20 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
} }
} }
private void raiseEarlyFailure(Throwable t) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
if (node != null) { // should not happen (==null) but safeguard anyhow
searchService.sendFreeContext(node, entry.value.id(), request);
}
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
}
}
listener.onFailure(t);
}
/** /**
* Releases shard targets that are not used in the docsIdsToLoad. * Releases shard targets that are not used in the docsIdsToLoad.
*/ */
@ -391,9 +405,13 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
if (request.scroll() == null) { if (request.scroll() == null) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) { for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
if (docIdsToLoad.get(entry.index) == null) { if (docIdsToLoad.get(entry.index) == null) {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); try {
if (node != null) { // should not happen (==null) but safeguard anyhow DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
searchService.sendFreeContext(node, entry.value.queryResult().id(), request); if (node != null) { // should not happen (==null) but safeguard anyhow
searchService.sendFreeContext(node, entry.value.queryResult().id(), request);
}
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
} }
} }
} }

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import com.google.common.collect.Lists;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
import java.util.Locale;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes = 2)
public class RejectionActionTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put("threadpool.search.size", 1)
.put("threadpool.search.queue_size", 1)
.put("threadpool.index.size", 1)
.put("threadpool.index.queue_size", 1)
.put("threadpool.get.size", 1)
.put("threadpool.get.queue_size", 1)
.build();
}
@Test
public void simulateSearchRejectionLoad() throws Throwable {
for (int i = 0; i < 10; i++) {
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "1").get();
}
int numberOfAsyncOps = randomIntBetween(200, 700);
final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
final CopyOnWriteArrayList<Object> responses = Lists.newCopyOnWriteArrayList();
for (int i = 0; i < numberOfAsyncOps; i++) {
client().prepareSearch("test")
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.matchQuery("field", "1"))
.execute(new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
responses.add(searchResponse);
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
responses.add(e);
latch.countDown();
}
});
}
latch.await();
assertThat(responses.size(), equalTo(numberOfAsyncOps));
// validate all responses
for (Object response : responses) {
if (response instanceof SearchResponse) {
SearchResponse searchResponse = (SearchResponse) response;
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
assertTrue("got unexpected reason..." + failure.reason(), failure.reason().toLowerCase(Locale.ENGLISH).contains("rejected"));
}
} else {
Throwable t = (Throwable) response;
Throwable unwrap = ExceptionsHelper.unwrapCause(t);
if (unwrap instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException e = (SearchPhaseExecutionException) unwrap;
for (ShardSearchFailure failure : e.shardFailures()) {
assertTrue("got unexpected reason..." + failure.reason(), failure.reason().toLowerCase(Locale.ENGLISH).contains("rejected"));
}
} else {
throw new AssertionError("unexpected failure", (Throwable) response);
}
}
}
}
}