Avoid stack overflow on search phases

When a search is executing locally over many shards, we can stack
overflow during query phase execution. This happens due to callbacks
that occur after a phase completes for a shard and we move to the same
phase on another shard. If all the shards for the query are local to the
local node then we will never go async and these callbacks will end up
as recursive calls. With sufficiently many shards, this will end up as a
stack overflow. This commit addresses this by truncating the stack by
forking to another thread on the executor for the phase.

Relates #27069
This commit is contained in:
Jason Tedor 2017-10-25 22:05:46 -04:00 committed by GitHub
parent b31028e8c4
commit 9aae2f593a
6 changed files with 263 additions and 118 deletions

View File

@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests) {
super(name, request, shardsIts, logger, maxConcurrentShardRequests);
super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor);
this.timeProvider = timeProvider;
this.logger = logger;
this.searchTransportService = searchTransportService;

View File

@ -26,12 +26,15 @@ import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.ConnectTransportException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
@ -45,18 +48,30 @@ import java.util.stream.Stream;
*/
abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends SearchPhase {
private final SearchRequest request;
private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
private final Logger logger;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
private final AtomicInteger shardExecutionIndex = new AtomicInteger(0);
private final int maxConcurrentShardRequests;
private final Executor executor;
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger,
int maxConcurrentShardRequests) {
int maxConcurrentShardRequests, Executor executor) {
super(name);
this.request = request;
this.shardsIts = shardsIts;
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
final List<SearchShardIterator> iterators = new ArrayList<>();
for (final SearchShardIterator iterator : shardsIts) {
if (iterator.skip()) {
toSkipIterators.add(iterator);
} else {
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
this.logger = logger;
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
@ -64,6 +79,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
// we process hence we add one for the non active partition here.
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size());
this.executor = executor;
}
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
@ -71,19 +87,19 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(),
shardIt.getOriginalIndices());
shardIt.getOriginalIndices());
onShardFailure(shardIndex, shardTarget, e);
if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request),
e);
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request),
e);
} else if (logger.isTraceEnabled()) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
}
@ -94,32 +110,27 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
final boolean lastShard = nextShard == null;
// trace log this exception
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() : shardIt.shardId(),
request,
lastShard),
e);
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() : shardIt.shardId(),
request,
lastShard),
e);
if (!lastShard) {
try {
performPhaseOnShard(shardIndex, shardIt, nextShard);
} catch (Exception inner) {
inner.addSuppressed(e);
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner);
}
performPhaseOnShard(shardIndex, shardIt, nextShard);
} else {
maybeExecuteNext(); // move to the next execution if needed
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request,
lastShard),
e);
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request,
lastShard),
e);
}
}
}
@ -128,14 +139,18 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
@Override
public final void run() throws IOException {
boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
for (int i = 0; i < maxConcurrentShardRequests; i++) {
SearchShardIterator shardRoutings = shardsIts.get(i);
if (shardRoutings.skip()) {
skipShard(shardRoutings);
} else {
performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull());
for (final SearchShardIterator iterator : toSkipShardsIts) {
assert iterator.skip();
skipShard(iterator);
}
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
for (int index = 0; index < maxConcurrentShardRequests; index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
}
@ -143,38 +158,71 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
private void maybeExecuteNext() {
final int index = shardExecutionIndex.getAndIncrement();
if (index < shardsIts.size()) {
SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.skip()) {
skipShard(shardRoutings);
} else {
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
final SearchShardIterator shardRoutings = shardsIts.get(index);
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
private void maybeFork(final Thread thread, final Runnable runnable) {
if (thread == Thread.currentThread()) {
fork(runnable);
} else {
runnable.run();
}
}
private void fork(final Runnable runnable) {
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
}
@Override
protected void doRun() throws Exception {
runnable.run();
}
@Override
public boolean isForceExecution() {
// we can not allow a stuffed queue to reject execution here
return true;
}
});
}
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
* continue on the same thread in the case that we never went async and this happens repeatedly we will end up recursing deeply and
* could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise
* we can continue (cf. InitialSearchPhase#maybeFork).
*/
final Thread thread = Thread.currentThread();
if (shard == null) {
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
onShardResult(result, shardIt);
maybeFork(thread, () -> onShardResult(result, shardIt));
}
@Override
public void onFailure(Exception t) {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
}
});
} catch (ConnectTransportException | IllegalArgumentException ex) {
// we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to
// the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected
// at all which is not not needed anymore.
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
} catch (final Exception e) {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might run in to
* nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
}
}
}
@ -204,7 +252,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
} else {
} else if (shardsIt.skip() == false) {
maybeExecuteNext();
}
}

View File

@ -24,9 +24,12 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.test.ESTestCase;
@ -38,11 +41,12 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
public void testFilterShards() throws InterruptedException {
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(),
@ -185,6 +189,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
final SearchTransportService searchTransportService =
new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
@ -197,11 +202,11 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
}
};
final AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed());
final GroupShardsIterator<SearchShardIterator> shardsIter =
SearchAsyncActionTests.getShardsIter("idx", originalIndices, 2048, randomBoolean(), primaryNode, replicaNode);
SearchAsyncActionTests.getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode);
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors()));
final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(
logger,
searchTransportService,
@ -215,16 +220,38 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
timeProvider,
0,
null,
(iter) -> new SearchPhase("test") {
(iter) -> new InitialSearchPhase<SearchPhaseResult>("test", null, iter, logger, randomIntBetween(1, 32), executor) {
@Override
public void run() throws IOException {
result.set(iter);
void onPhaseDone() {
latch.countDown();
}});
}
@Override
void onShardFailure(final int shardIndex, final SearchShardTarget shardTarget, final Exception ex) {
}
@Override
void onShardSuccess(final SearchPhaseResult result) {
}
@Override
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
if (randomBoolean()) {
listener.onResponse(new SearchPhaseResult() {});
} else {
listener.onFailure(new Exception("failure"));
}
}
});
canMatchPhase.start();
latch.await();
executor.shutdown();
}
}

View File

@ -50,6 +50,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -285,6 +287,7 @@ public class SearchAsyncActionTests extends ESTestCase {
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors()));
AbstractSearchAsyncAction asyncAction =
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
"test",
@ -295,7 +298,7 @@ public class SearchAsyncActionTests extends ESTestCase {
return lookup.get(node); },
aliasFilters,
Collections.emptyMap(),
null,
executor,
request,
responseListener,
shardsIter,
@ -349,6 +352,7 @@ public class SearchAsyncActionTests extends ESTestCase {
} else {
assertTrue(nodeToContextMap.get(replicaNode).toString(), nodeToContextMap.get(replicaNode).isEmpty());
}
executor.shutdown();
}
static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards,

View File

@ -35,6 +35,15 @@ run {
setting 'reindex.remote.whitelist', '127.0.0.1:*'
}
test {
/*
* We have to disable setting the number of available processors as tests in the
* same JVM randomize processors and will step on each other if we allow them to
* set the number of available processors as it's set-once in Netty.
*/
systemProperty 'es.set.netty.runtime.available.processors', 'false'
}
dependencies {
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
// for http - testing reindex from remote

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.Retry;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
@ -33,16 +34,17 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.function.Function;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.reindex.ReindexTestCase.matcher;
@ -51,32 +53,15 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
/**
* Integration test for retry behavior. Useful because retrying relies on the way that the rest of Elasticsearch throws exceptions and unit
* tests won't verify that.
* Integration test for retry behavior. Useful because retrying relies on the way that the
* rest of Elasticsearch throws exceptions and unit tests won't verify that.
*/
public class RetryTests extends ESSingleNodeTestCase {
public class RetryTests extends ESIntegTestCase {
private static final int DOC_COUNT = 20;
private List<CyclicBarrier> blockedExecutors = new ArrayList<>();
@Before
public void setUp() throws Exception {
super.setUp();
createIndex("source");
// Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools.
BulkRequestBuilder bulk = client().prepareBulk();
for (int i = 0; i < DOC_COUNT; i++) {
bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i));
}
Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool());
BulkResponse response = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet();
assertFalse(response.buildFailureMessage(), response.hasFailures());
client().admin().indices().prepareRefresh("source").get();
}
@After
public void forceUnblockAllExecutors() {
for (CyclicBarrier barrier: blockedExecutors) {
@ -85,8 +70,15 @@ public class RetryTests extends ESSingleNodeTestCase {
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(
ReindexPlugin.class,
Netty4Plugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(
ReindexPlugin.class,
Netty4Plugin.class);
}
@ -95,63 +87,123 @@ public class RetryTests extends ESSingleNodeTestCase {
* Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried.
*/
@Override
protected Settings nodeSettings() {
Settings.Builder settings = Settings.builder().put(super.nodeSettings());
// Use pools of size 1 so we can block them
settings.put("thread_pool.bulk.size", 1);
settings.put("thread_pool.search.size", 1);
// Use queues of size 1 because size 0 is broken and because search requests need the queue to function
settings.put("thread_pool.bulk.queue_size", 1);
settings.put("thread_pool.search.queue_size", 1);
// Enable http so we can test retries on reindex from remote. In this case the "remote" cluster is just this cluster.
settings.put(NetworkModule.HTTP_ENABLED.getKey(), true);
// Whitelist reindexing from the http host we're going to use
settings.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "127.0.0.1:*");
return settings.build();
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nodeSettings()).build();
}
final Settings nodeSettings() {
return Settings.builder()
// enable HTTP so we can test retries on reindex from remote; in this case the "remote" cluster is just this cluster
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
// whitelist reindexing from the HTTP host we're going to use
.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "127.0.0.1:*")
.build();
}
public void testReindex() throws Exception {
testCase(ReindexAction.NAME, ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest"),
testCase(
ReindexAction.NAME,
client -> ReindexAction.INSTANCE.newRequestBuilder(client).source("source").destination("dest"),
matcher().created(DOC_COUNT));
}
public void testReindexFromRemote() throws Exception {
NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0);
TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress();
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote);
testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT));
Function<Client, AbstractBulkByScrollRequestBuilder<?, ?>> function = client -> {
/*
* Use the master node for the reindex from remote because that node
* doesn't have a copy of the data on it.
*/
NodeInfo masterNode = null;
for (NodeInfo candidate : client.admin().cluster().prepareNodesInfo().get().getNodes()) {
if (candidate.getNode().isMasterNode()) {
masterNode = candidate;
}
}
assertNotNull(masterNode);
TransportAddress address = masterNode.getHttp().getAddress().publishAddress();
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client).source("source").destination("dest")
.setRemoteInfo(remote);
return request;
};
testCase(ReindexAction.NAME, function, matcher().created(DOC_COUNT));
}
public void testUpdateByQuery() throws Exception {
testCase(UpdateByQueryAction.NAME, UpdateByQueryAction.INSTANCE.newRequestBuilder(client()).source("source"),
testCase(UpdateByQueryAction.NAME, client -> UpdateByQueryAction.INSTANCE.newRequestBuilder(client).source("source"),
matcher().updated(DOC_COUNT));
}
public void testDeleteByQuery() throws Exception {
testCase(DeleteByQueryAction.NAME, DeleteByQueryAction.INSTANCE.newRequestBuilder(client()).source("source")
testCase(DeleteByQueryAction.NAME, client -> DeleteByQueryAction.INSTANCE.newRequestBuilder(client).source("source")
.filter(QueryBuilders.matchAllQuery()), matcher().deleted(DOC_COUNT));
}
private void testCase(String action, AbstractBulkByScrollRequestBuilder<?, ?> request, BulkIndexByScrollResponseMatcher matcher)
private void testCase(
String action,
Function<Client, AbstractBulkByScrollRequestBuilder<?, ?>> request,
BulkIndexByScrollResponseMatcher matcher)
throws Exception {
logger.info("Blocking search");
CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH);
/*
* These test cases work by stuffing the search and bulk queues of a single node and
* making sure that we read and write from that node. Because of some "fun" with the
* way that searches work, we need at least one more node to act as the coordinating
* node for the search request. If we didn't do this then the searches would get stuck
* in the queue anyway because we force queue portions of the coordinating node's
* actions. This is not a big deal in normal operations but a real pain when you are
* intentionally stuffing queues hoping for a failure.
*/
final Settings nodeSettings = Settings.builder()
// use pools of size 1 so we can block them
.put("thread_pool.bulk.size", 1)
.put("thread_pool.search.size", 1)
// use queues of size 1 because size 0 is broken and because search requests need the queue to function
.put("thread_pool.bulk.queue_size", 1)
.put("thread_pool.search.queue_size", 1)
.put("node.attr.color", "blue")
.build();
final String node = internalCluster().startDataOnlyNode(nodeSettings);
final Settings indexSettings =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.routing.allocation.include.color", "blue")
.build();
// Create the source index on the node with small thread pools so we can block them.
client().admin().indices().prepareCreate("source").setSettings(indexSettings).execute().actionGet();
// Not all test cases use the dest index but those that do require that it be on the node will small thread pools
client().admin().indices().prepareCreate("dest").setSettings(indexSettings).execute().actionGet();
// Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools.
BulkRequestBuilder bulk = client().prepareBulk();
for (int i = 0; i < DOC_COUNT; i++) {
bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i));
}
Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool());
BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet();
assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures());
client().admin().indices().prepareRefresh("source").get();
logger.info("Blocking search");
CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH, node);
AbstractBulkByScrollRequestBuilder<?, ?> builder = request.apply(internalCluster().masterClient());
// Make sure we use more than one batch so we have to scroll
request.source().setSize(DOC_COUNT / randomIntBetween(2, 10));
builder.source().setSize(DOC_COUNT / randomIntBetween(2, 10));
logger.info("Starting request");
ActionFuture<BulkByScrollResponse> responseListener = request.execute();
ActionFuture<BulkByScrollResponse> responseListener = builder.execute();
try {
logger.info("Waiting for search rejections on the initial search");
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L)));
logger.info("Blocking bulk and unblocking search so we start to get bulk rejections");
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK);
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK, node);
initialSearchBlock.await();
logger.info("Waiting for bulk rejections");
@ -161,7 +213,7 @@ public class RetryTests extends ESSingleNodeTestCase {
long initialSearchRejections = taskStatus(action).getSearchRetries();
logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll");
CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH);
CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH, node);
bulkBlock.await();
logger.info("Waiting for search rejections for the scroll");
@ -187,8 +239,8 @@ public class RetryTests extends ESSingleNodeTestCase {
* Blocks the named executor by getting its only thread running a task blocked on a CyclicBarrier and fills the queue with a noop task.
* So requests to use this queue should get {@link EsRejectedExecutionException}s.
*/
private CyclicBarrier blockExecutor(String name) throws Exception {
ThreadPool threadPool = getInstanceFromNode(ThreadPool.class);
private CyclicBarrier blockExecutor(String name, String node) throws Exception {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, node);
CyclicBarrier barrier = new CyclicBarrier(2);
logger.info("Blocking the [{}] executor", name);
threadPool.executor(name).execute(() -> {
@ -211,6 +263,11 @@ public class RetryTests extends ESSingleNodeTestCase {
* Fetch the status for a task of type "action". Fails if there aren't exactly one of that type of task running.
*/
private BulkByScrollTask.Status taskStatus(String action) {
/*
* We always use the master client because we always start the test requests on the
* master. We do this simply to make sure that the test request is not started on the
* node who's queue we're manipulating.
*/
ListTasksResponse response = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get();
assertThat(response.getTasks(), hasSize(1));
return (BulkByScrollTask.Status) response.getTasks().get(0).getStatus();