From 9aae2f593a89a8a0b79433f71bbc8cb30361bd2b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 25 Oct 2017 22:05:46 -0400 Subject: [PATCH] Avoid stack overflow on search phases When a search is executing locally over many shards, we can stack overflow during query phase execution. This happens due to callbacks that occur after a phase completes for a shard and we move to the same phase on another shard. If all the shards for the query are local to the local node then we will never go async and these callbacks will end up as recursive calls. With sufficiently many shards, this will end up as a stack overflow. This commit addresses this by truncating the stack by forking to another thread on the executor for the phase. Relates #27069 --- .../search/AbstractSearchAsyncAction.java | 2 +- .../action/search/InitialSearchPhase.java | 152 ++++++++++------ .../CanMatchPreFilterSearchPhaseTests.java | 43 ++++- .../action/search/SearchAsyncActionTests.java | 6 +- modules/reindex/build.gradle | 9 + .../index/reindex/RetryTests.java | 169 ++++++++++++------ 6 files changed, 263 insertions(+), 118 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c7f1fa5dc5c..1c25cd7ac37 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction exten ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests) { - super(name, request, shardsIts, logger, maxConcurrentShardRequests); + super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor); this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index a68d1d599c5..0da74242937 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -26,12 +26,15 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.transport.ConnectTransportException; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -45,18 +48,30 @@ import java.util.stream.Stream; */ abstract class InitialSearchPhase extends SearchPhase { private final SearchRequest request; + private final GroupShardsIterator toSkipShardsIts; private final GroupShardsIterator shardsIts; private final Logger logger; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); private final int maxConcurrentShardRequests; + private final Executor executor; InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger, - int maxConcurrentShardRequests) { + int maxConcurrentShardRequests, Executor executor) { super(name); this.request = request; - this.shardsIts = shardsIts; + final List toSkipIterators = new ArrayList<>(); + final List iterators = new ArrayList<>(); + for (final SearchShardIterator iterator : shardsIts) { + if (iterator.skip()) { + toSkipIterators.add(iterator); + } else { + iterators.add(iterator); + } + } + this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators); + this.shardsIts = new GroupShardsIterator<>(iterators); this.logger = logger; // we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up // it's number of active shards but use 1 as the default if no replica of a shard is active at this point. @@ -64,6 +79,7 @@ abstract class InitialSearchPhase extends // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size()); + this.executor = executor; } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, @@ -71,19 +87,19 @@ abstract class InitialSearchPhase extends // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(), - shardIt.getOriginalIndices()); + shardIt.getOriginalIndices()); onShardFailure(shardIndex, shardTarget, e); if (totalOps.incrementAndGet() == expectedTotalOps) { if (logger.isDebugEnabled()) { if (e != null && !TransportActions.isShardNotAvailableException(e)) { logger.debug( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}]", - shard != null ? shard.shortSummary() : - shardIt.shardId(), - request), - e); + (Supplier) () -> new ParameterizedMessage( + "{}: Failed to execute [{}]", + shard != null ? shard.shortSummary() : + shardIt.shardId(), + request), + e); } else if (logger.isTraceEnabled()) { logger.trace((Supplier) () -> new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e); } @@ -94,32 +110,27 @@ abstract class InitialSearchPhase extends final boolean lastShard = nextShard == null; // trace log this exception logger.trace( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}] lastShard [{}]", - shard != null ? shard.shortSummary() : shardIt.shardId(), - request, - lastShard), - e); + (Supplier) () -> new ParameterizedMessage( + "{}: Failed to execute [{}] lastShard [{}]", + shard != null ? shard.shortSummary() : shardIt.shardId(), + request, + lastShard), + e); if (!lastShard) { - try { - performPhaseOnShard(shardIndex, shardIt, nextShard); - } catch (Exception inner) { - inner.addSuppressed(e); - onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner); - } + performPhaseOnShard(shardIndex, shardIt, nextShard); } else { maybeExecuteNext(); // move to the next execution if needed // no more shards active, add a failure if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception if (e != null && !TransportActions.isShardNotAvailableException(e)) { logger.debug( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}] lastShard [{}]", - shard != null ? shard.shortSummary() : - shardIt.shardId(), - request, - lastShard), - e); + (Supplier) () -> new ParameterizedMessage( + "{}: Failed to execute [{}] lastShard [{}]", + shard != null ? shard.shortSummary() : + shardIt.shardId(), + request, + lastShard), + e); } } } @@ -128,14 +139,18 @@ abstract class InitialSearchPhase extends @Override public final void run() throws IOException { - boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); - assert success; - for (int i = 0; i < maxConcurrentShardRequests; i++) { - SearchShardIterator shardRoutings = shardsIts.get(i); - if (shardRoutings.skip()) { - skipShard(shardRoutings); - } else { - performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull()); + for (final SearchShardIterator iterator : toSkipShardsIts) { + assert iterator.skip(); + skipShard(iterator); + } + if (shardsIts.size() > 0) { + int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size()); + final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); + assert success; + for (int index = 0; index < maxConcurrentShardRequests; index++) { + final SearchShardIterator shardRoutings = shardsIts.get(index); + assert shardRoutings.skip() == false; + performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); } } } @@ -143,38 +158,71 @@ abstract class InitialSearchPhase extends private void maybeExecuteNext() { final int index = shardExecutionIndex.getAndIncrement(); if (index < shardsIts.size()) { - SearchShardIterator shardRoutings = shardsIts.get(index); - if (shardRoutings.skip()) { - skipShard(shardRoutings); - } else { - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); - } + final SearchShardIterator shardRoutings = shardsIts.get(index); + performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); } } + private void maybeFork(final Thread thread, final Runnable runnable) { + if (thread == Thread.currentThread()) { + fork(runnable); + } else { + runnable.run(); + } + } + + private void fork(final Runnable runnable) { + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + + } + + @Override + protected void doRun() throws Exception { + runnable.run(); + } + + @Override + public boolean isForceExecution() { + // we can not allow a stuffed queue to reject execution here + return true; + } + }); + } + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { + /* + * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the + * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we + * continue on the same thread in the case that we never went async and this happens repeatedly we will end up recursing deeply and + * could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise + * we can continue (cf. InitialSearchPhase#maybeFork). + */ + final Thread thread = Thread.currentThread(); if (shard == null) { - onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { try { executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { @Override public void innerOnResponse(FirstResult result) { - onShardResult(result, shardIt); + maybeFork(thread, () -> onShardResult(result, shardIt)); } @Override public void onFailure(Exception t) { - onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t); + maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t)); } }); - } catch (ConnectTransportException | IllegalArgumentException ex) { - // we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to - // the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected - // at all which is not not needed anymore. - onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, ex); + } catch (final Exception e) { + /* + * It is possible to run into connection exceptions here because we are getting the connection early and might run in to + * nodes that are not connected. In this case, on shard failure will move us to the next shard copy. + */ + fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); } } } @@ -204,7 +252,7 @@ abstract class InitialSearchPhase extends } else if (xTotalOps > expectedTotalOps) { throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]"); - } else { + } else if (shardsIt.skip() == false) { maybeExecuteNext(); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 373173a1fc6..9e0b4f7fee9 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -24,9 +24,12 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.test.ESTestCase; @@ -38,11 +41,12 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { - public void testFilterShards() throws InterruptedException { final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), @@ -185,6 +189,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + final SearchTransportService searchTransportService = new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override @@ -197,11 +202,11 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { } }; - final AtomicReference> result = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()); final GroupShardsIterator shardsIter = - SearchAsyncActionTests.getShardsIter("idx", originalIndices, 2048, randomBoolean(), primaryNode, replicaNode); + SearchAsyncActionTests.getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode); + final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors())); final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -215,16 +220,38 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { timeProvider, 0, null, - (iter) -> new SearchPhase("test") { + (iter) -> new InitialSearchPhase("test", null, iter, logger, randomIntBetween(1, 32), executor) { @Override - public void run() throws IOException { - result.set(iter); + void onPhaseDone() { latch.countDown(); - }}); + } + + @Override + void onShardFailure(final int shardIndex, final SearchShardTarget shardTarget, final Exception ex) { + + } + + @Override + void onShardSuccess(final SearchPhaseResult result) { + + } + + @Override + protected void executePhaseOnShard( + final SearchShardIterator shardIt, + final ShardRouting shard, + final SearchActionListener listener) { + if (randomBoolean()) { + listener.onResponse(new SearchPhaseResult() {}); + } else { + listener.onFailure(new Exception("failure")); + } + } + }); canMatchPhase.start(); latch.await(); - + executor.shutdown(); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index b9602f26346..8a9c98395d7 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -50,6 +50,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -285,6 +287,7 @@ public class SearchAsyncActionTests extends ESTestCase { lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); + final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors())); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction( "test", @@ -295,7 +298,7 @@ public class SearchAsyncActionTests extends ESTestCase { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - null, + executor, request, responseListener, shardsIter, @@ -349,6 +352,7 @@ public class SearchAsyncActionTests extends ESTestCase { } else { assertTrue(nodeToContextMap.get(replicaNode).toString(), nodeToContextMap.get(replicaNode).isEmpty()); } + executor.shutdown(); } static GroupShardsIterator getShardsIter(String index, OriginalIndices originalIndices, int numShards, diff --git a/modules/reindex/build.gradle b/modules/reindex/build.gradle index 3de4d3f5427..f29daf79912 100644 --- a/modules/reindex/build.gradle +++ b/modules/reindex/build.gradle @@ -35,6 +35,15 @@ run { setting 'reindex.remote.whitelist', '127.0.0.1:*' } +test { + /* + * We have to disable setting the number of available processors as tests in the + * same JVM randomize processors and will step on each other if we allow them to + * set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} + dependencies { compile "org.elasticsearch.client:elasticsearch-rest-client:${version}" // for http - testing reindex from remote diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index 9b19c572c0b..da0dbf2aae3 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.Retry; +import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -33,16 +34,17 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Netty4Plugin; import org.junit.After; -import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CyclicBarrier; +import java.util.function.Function; import static java.util.Collections.emptyMap; import static org.elasticsearch.index.reindex.ReindexTestCase.matcher; @@ -51,32 +53,15 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; /** - * Integration test for retry behavior. Useful because retrying relies on the way that the rest of Elasticsearch throws exceptions and unit - * tests won't verify that. + * Integration test for retry behavior. Useful because retrying relies on the way that the + * rest of Elasticsearch throws exceptions and unit tests won't verify that. */ -public class RetryTests extends ESSingleNodeTestCase { +public class RetryTests extends ESIntegTestCase { private static final int DOC_COUNT = 20; private List blockedExecutors = new ArrayList<>(); - - @Before - public void setUp() throws Exception { - super.setUp(); - createIndex("source"); - // Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools. - BulkRequestBuilder bulk = client().prepareBulk(); - for (int i = 0; i < DOC_COUNT; i++) { - bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i)); - } - - Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool()); - BulkResponse response = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet(); - assertFalse(response.buildFailureMessage(), response.hasFailures()); - client().admin().indices().prepareRefresh("source").get(); - } - @After public void forceUnblockAllExecutors() { for (CyclicBarrier barrier: blockedExecutors) { @@ -85,8 +70,15 @@ public class RetryTests extends ESSingleNodeTestCase { } @Override - protected Collection> getPlugins() { - return pluginList( + protected Collection> nodePlugins() { + return Arrays.asList( + ReindexPlugin.class, + Netty4Plugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList( ReindexPlugin.class, Netty4Plugin.class); } @@ -95,63 +87,123 @@ public class RetryTests extends ESSingleNodeTestCase { * Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried. */ @Override - protected Settings nodeSettings() { - Settings.Builder settings = Settings.builder().put(super.nodeSettings()); - // Use pools of size 1 so we can block them - settings.put("thread_pool.bulk.size", 1); - settings.put("thread_pool.search.size", 1); - // Use queues of size 1 because size 0 is broken and because search requests need the queue to function - settings.put("thread_pool.bulk.queue_size", 1); - settings.put("thread_pool.search.queue_size", 1); - // Enable http so we can test retries on reindex from remote. In this case the "remote" cluster is just this cluster. - settings.put(NetworkModule.HTTP_ENABLED.getKey(), true); - // Whitelist reindexing from the http host we're going to use - settings.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "127.0.0.1:*"); - return settings.build(); + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nodeSettings()).build(); + } + + final Settings nodeSettings() { + return Settings.builder() + // enable HTTP so we can test retries on reindex from remote; in this case the "remote" cluster is just this cluster + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + // whitelist reindexing from the HTTP host we're going to use + .put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "127.0.0.1:*") + .build(); } public void testReindex() throws Exception { - testCase(ReindexAction.NAME, ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest"), + testCase( + ReindexAction.NAME, + client -> ReindexAction.INSTANCE.newRequestBuilder(client).source("source").destination("dest"), matcher().created(DOC_COUNT)); } public void testReindexFromRemote() throws Exception { - NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0); - TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress(); - RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, - null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); - ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") - .setRemoteInfo(remote); - testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT)); + Function> function = client -> { + /* + * Use the master node for the reindex from remote because that node + * doesn't have a copy of the data on it. + */ + NodeInfo masterNode = null; + for (NodeInfo candidate : client.admin().cluster().prepareNodesInfo().get().getNodes()) { + if (candidate.getNode().isMasterNode()) { + masterNode = candidate; + } + } + assertNotNull(masterNode); + + TransportAddress address = masterNode.getHttp().getAddress().publishAddress(); + RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, + null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); + ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client).source("source").destination("dest") + .setRemoteInfo(remote); + return request; + }; + testCase(ReindexAction.NAME, function, matcher().created(DOC_COUNT)); } public void testUpdateByQuery() throws Exception { - testCase(UpdateByQueryAction.NAME, UpdateByQueryAction.INSTANCE.newRequestBuilder(client()).source("source"), + testCase(UpdateByQueryAction.NAME, client -> UpdateByQueryAction.INSTANCE.newRequestBuilder(client).source("source"), matcher().updated(DOC_COUNT)); } public void testDeleteByQuery() throws Exception { - testCase(DeleteByQueryAction.NAME, DeleteByQueryAction.INSTANCE.newRequestBuilder(client()).source("source") + testCase(DeleteByQueryAction.NAME, client -> DeleteByQueryAction.INSTANCE.newRequestBuilder(client).source("source") .filter(QueryBuilders.matchAllQuery()), matcher().deleted(DOC_COUNT)); } - private void testCase(String action, AbstractBulkByScrollRequestBuilder request, BulkIndexByScrollResponseMatcher matcher) + private void testCase( + String action, + Function> request, + BulkIndexByScrollResponseMatcher matcher) throws Exception { - logger.info("Blocking search"); - CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH); + /* + * These test cases work by stuffing the search and bulk queues of a single node and + * making sure that we read and write from that node. Because of some "fun" with the + * way that searches work, we need at least one more node to act as the coordinating + * node for the search request. If we didn't do this then the searches would get stuck + * in the queue anyway because we force queue portions of the coordinating node's + * actions. This is not a big deal in normal operations but a real pain when you are + * intentionally stuffing queues hoping for a failure. + */ + final Settings nodeSettings = Settings.builder() + // use pools of size 1 so we can block them + .put("thread_pool.bulk.size", 1) + .put("thread_pool.search.size", 1) + // use queues of size 1 because size 0 is broken and because search requests need the queue to function + .put("thread_pool.bulk.queue_size", 1) + .put("thread_pool.search.queue_size", 1) + .put("node.attr.color", "blue") + .build(); + final String node = internalCluster().startDataOnlyNode(nodeSettings); + final Settings indexSettings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.include.color", "blue") + .build(); + + // Create the source index on the node with small thread pools so we can block them. + client().admin().indices().prepareCreate("source").setSettings(indexSettings).execute().actionGet(); + // Not all test cases use the dest index but those that do require that it be on the node will small thread pools + client().admin().indices().prepareCreate("dest").setSettings(indexSettings).execute().actionGet(); + // Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools. + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < DOC_COUNT; i++) { + bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i)); + } + + Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool()); + BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet(); + assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures()); + client().admin().indices().prepareRefresh("source").get(); + + logger.info("Blocking search"); + CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH, node); + + AbstractBulkByScrollRequestBuilder builder = request.apply(internalCluster().masterClient()); // Make sure we use more than one batch so we have to scroll - request.source().setSize(DOC_COUNT / randomIntBetween(2, 10)); + builder.source().setSize(DOC_COUNT / randomIntBetween(2, 10)); logger.info("Starting request"); - ActionFuture responseListener = request.execute(); + ActionFuture responseListener = builder.execute(); try { logger.info("Waiting for search rejections on the initial search"); assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L))); logger.info("Blocking bulk and unblocking search so we start to get bulk rejections"); - CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK); + CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK, node); initialSearchBlock.await(); logger.info("Waiting for bulk rejections"); @@ -161,7 +213,7 @@ public class RetryTests extends ESSingleNodeTestCase { long initialSearchRejections = taskStatus(action).getSearchRetries(); logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll"); - CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH); + CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH, node); bulkBlock.await(); logger.info("Waiting for search rejections for the scroll"); @@ -187,8 +239,8 @@ public class RetryTests extends ESSingleNodeTestCase { * Blocks the named executor by getting its only thread running a task blocked on a CyclicBarrier and fills the queue with a noop task. * So requests to use this queue should get {@link EsRejectedExecutionException}s. */ - private CyclicBarrier blockExecutor(String name) throws Exception { - ThreadPool threadPool = getInstanceFromNode(ThreadPool.class); + private CyclicBarrier blockExecutor(String name, String node) throws Exception { + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, node); CyclicBarrier barrier = new CyclicBarrier(2); logger.info("Blocking the [{}] executor", name); threadPool.executor(name).execute(() -> { @@ -211,6 +263,11 @@ public class RetryTests extends ESSingleNodeTestCase { * Fetch the status for a task of type "action". Fails if there aren't exactly one of that type of task running. */ private BulkByScrollTask.Status taskStatus(String action) { + /* + * We always use the master client because we always start the test requests on the + * master. We do this simply to make sure that the test request is not started on the + * node who's queue we're manipulating. + */ ListTasksResponse response = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get(); assertThat(response.getTasks(), hasSize(1)); return (BulkByScrollTask.Status) response.getTasks().get(0).getStatus();