diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c7f1fa5dc5c..1c25cd7ac37 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction exten ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests) { - super(name, request, shardsIts, logger, maxConcurrentShardRequests); + super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor); this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index a68d1d599c5..0da74242937 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -26,12 +26,15 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.transport.ConnectTransportException; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -45,18 +48,30 @@ import java.util.stream.Stream; */ abstract class InitialSearchPhase extends SearchPhase { private final SearchRequest request; + private final GroupShardsIterator toSkipShardsIts; private final GroupShardsIterator shardsIts; private final Logger logger; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); private final int maxConcurrentShardRequests; + private final Executor executor; InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger, - int maxConcurrentShardRequests) { + int maxConcurrentShardRequests, Executor executor) { super(name); this.request = request; - this.shardsIts = shardsIts; + final List toSkipIterators = new ArrayList<>(); + final List iterators = new ArrayList<>(); + for (final SearchShardIterator iterator : shardsIts) { + if (iterator.skip()) { + toSkipIterators.add(iterator); + } else { + iterators.add(iterator); + } + } + this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators); + this.shardsIts = new GroupShardsIterator<>(iterators); this.logger = logger; // we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up // it's number of active shards but use 1 as the default if no replica of a shard is active at this point. @@ -64,6 +79,7 @@ abstract class InitialSearchPhase extends // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size()); + this.executor = executor; } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, @@ -71,19 +87,19 @@ abstract class InitialSearchPhase extends // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(), - shardIt.getOriginalIndices()); + shardIt.getOriginalIndices()); onShardFailure(shardIndex, shardTarget, e); if (totalOps.incrementAndGet() == expectedTotalOps) { if (logger.isDebugEnabled()) { if (e != null && !TransportActions.isShardNotAvailableException(e)) { logger.debug( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}]", - shard != null ? shard.shortSummary() : - shardIt.shardId(), - request), - e); + (Supplier) () -> new ParameterizedMessage( + "{}: Failed to execute [{}]", + shard != null ? shard.shortSummary() : + shardIt.shardId(), + request), + e); } else if (logger.isTraceEnabled()) { logger.trace((Supplier) () -> new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e); } @@ -94,32 +110,27 @@ abstract class InitialSearchPhase extends final boolean lastShard = nextShard == null; // trace log this exception logger.trace( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}] lastShard [{}]", - shard != null ? shard.shortSummary() : shardIt.shardId(), - request, - lastShard), - e); + (Supplier) () -> new ParameterizedMessage( + "{}: Failed to execute [{}] lastShard [{}]", + shard != null ? shard.shortSummary() : shardIt.shardId(), + request, + lastShard), + e); if (!lastShard) { - try { - performPhaseOnShard(shardIndex, shardIt, nextShard); - } catch (Exception inner) { - inner.addSuppressed(e); - onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner); - } + performPhaseOnShard(shardIndex, shardIt, nextShard); } else { maybeExecuteNext(); // move to the next execution if needed // no more shards active, add a failure if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception if (e != null && !TransportActions.isShardNotAvailableException(e)) { logger.debug( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}] lastShard [{}]", - shard != null ? shard.shortSummary() : - shardIt.shardId(), - request, - lastShard), - e); + (Supplier) () -> new ParameterizedMessage( + "{}: Failed to execute [{}] lastShard [{}]", + shard != null ? shard.shortSummary() : + shardIt.shardId(), + request, + lastShard), + e); } } } @@ -128,14 +139,18 @@ abstract class InitialSearchPhase extends @Override public final void run() throws IOException { - boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); - assert success; - for (int i = 0; i < maxConcurrentShardRequests; i++) { - SearchShardIterator shardRoutings = shardsIts.get(i); - if (shardRoutings.skip()) { - skipShard(shardRoutings); - } else { - performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull()); + for (final SearchShardIterator iterator : toSkipShardsIts) { + assert iterator.skip(); + skipShard(iterator); + } + if (shardsIts.size() > 0) { + int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size()); + final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); + assert success; + for (int index = 0; index < maxConcurrentShardRequests; index++) { + final SearchShardIterator shardRoutings = shardsIts.get(index); + assert shardRoutings.skip() == false; + performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); } } } @@ -143,38 +158,71 @@ abstract class InitialSearchPhase extends private void maybeExecuteNext() { final int index = shardExecutionIndex.getAndIncrement(); if (index < shardsIts.size()) { - SearchShardIterator shardRoutings = shardsIts.get(index); - if (shardRoutings.skip()) { - skipShard(shardRoutings); - } else { - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); - } + final SearchShardIterator shardRoutings = shardsIts.get(index); + performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); } } + private void maybeFork(final Thread thread, final Runnable runnable) { + if (thread == Thread.currentThread()) { + fork(runnable); + } else { + runnable.run(); + } + } + + private void fork(final Runnable runnable) { + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + + } + + @Override + protected void doRun() throws Exception { + runnable.run(); + } + + @Override + public boolean isForceExecution() { + // we can not allow a stuffed queue to reject execution here + return true; + } + }); + } + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { + /* + * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the + * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we + * continue on the same thread in the case that we never went async and this happens repeatedly we will end up recursing deeply and + * could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise + * we can continue (cf. InitialSearchPhase#maybeFork). + */ + final Thread thread = Thread.currentThread(); if (shard == null) { - onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { try { executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { @Override public void innerOnResponse(FirstResult result) { - onShardResult(result, shardIt); + maybeFork(thread, () -> onShardResult(result, shardIt)); } @Override public void onFailure(Exception t) { - onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t); + maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t)); } }); - } catch (ConnectTransportException | IllegalArgumentException ex) { - // we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to - // the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected - // at all which is not not needed anymore. - onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, ex); + } catch (final Exception e) { + /* + * It is possible to run into connection exceptions here because we are getting the connection early and might run in to + * nodes that are not connected. In this case, on shard failure will move us to the next shard copy. + */ + fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); } } } @@ -204,7 +252,7 @@ abstract class InitialSearchPhase extends } else if (xTotalOps > expectedTotalOps) { throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]"); - } else { + } else if (shardsIt.skip() == false) { maybeExecuteNext(); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 373173a1fc6..9e0b4f7fee9 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -24,9 +24,12 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.test.ESTestCase; @@ -38,11 +41,12 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { - public void testFilterShards() throws InterruptedException { final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), @@ -185,6 +189,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + final SearchTransportService searchTransportService = new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override @@ -197,11 +202,11 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { } }; - final AtomicReference> result = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()); final GroupShardsIterator shardsIter = - SearchAsyncActionTests.getShardsIter("idx", originalIndices, 2048, randomBoolean(), primaryNode, replicaNode); + SearchAsyncActionTests.getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode); + final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors())); final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -215,16 +220,38 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { timeProvider, 0, null, - (iter) -> new SearchPhase("test") { + (iter) -> new InitialSearchPhase("test", null, iter, logger, randomIntBetween(1, 32), executor) { @Override - public void run() throws IOException { - result.set(iter); + void onPhaseDone() { latch.countDown(); - }}); + } + + @Override + void onShardFailure(final int shardIndex, final SearchShardTarget shardTarget, final Exception ex) { + + } + + @Override + void onShardSuccess(final SearchPhaseResult result) { + + } + + @Override + protected void executePhaseOnShard( + final SearchShardIterator shardIt, + final ShardRouting shard, + final SearchActionListener listener) { + if (randomBoolean()) { + listener.onResponse(new SearchPhaseResult() {}); + } else { + listener.onFailure(new Exception("failure")); + } + } + }); canMatchPhase.start(); latch.await(); - + executor.shutdown(); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index b9602f26346..8a9c98395d7 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -50,6 +50,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -285,6 +287,7 @@ public class SearchAsyncActionTests extends ESTestCase { lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); + final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors())); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction( "test", @@ -295,7 +298,7 @@ public class SearchAsyncActionTests extends ESTestCase { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - null, + executor, request, responseListener, shardsIter, @@ -349,6 +352,7 @@ public class SearchAsyncActionTests extends ESTestCase { } else { assertTrue(nodeToContextMap.get(replicaNode).toString(), nodeToContextMap.get(replicaNode).isEmpty()); } + executor.shutdown(); } static GroupShardsIterator getShardsIter(String index, OriginalIndices originalIndices, int numShards, diff --git a/modules/reindex/build.gradle b/modules/reindex/build.gradle index 3de4d3f5427..f29daf79912 100644 --- a/modules/reindex/build.gradle +++ b/modules/reindex/build.gradle @@ -35,6 +35,15 @@ run { setting 'reindex.remote.whitelist', '127.0.0.1:*' } +test { + /* + * We have to disable setting the number of available processors as tests in the + * same JVM randomize processors and will step on each other if we allow them to + * set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} + dependencies { compile "org.elasticsearch.client:elasticsearch-rest-client:${version}" // for http - testing reindex from remote diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index 9b19c572c0b..da0dbf2aae3 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.Retry; +import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -33,16 +34,17 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Netty4Plugin; import org.junit.After; -import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CyclicBarrier; +import java.util.function.Function; import static java.util.Collections.emptyMap; import static org.elasticsearch.index.reindex.ReindexTestCase.matcher; @@ -51,32 +53,15 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; /** - * Integration test for retry behavior. Useful because retrying relies on the way that the rest of Elasticsearch throws exceptions and unit - * tests won't verify that. + * Integration test for retry behavior. Useful because retrying relies on the way that the + * rest of Elasticsearch throws exceptions and unit tests won't verify that. */ -public class RetryTests extends ESSingleNodeTestCase { +public class RetryTests extends ESIntegTestCase { private static final int DOC_COUNT = 20; private List blockedExecutors = new ArrayList<>(); - - @Before - public void setUp() throws Exception { - super.setUp(); - createIndex("source"); - // Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools. - BulkRequestBuilder bulk = client().prepareBulk(); - for (int i = 0; i < DOC_COUNT; i++) { - bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i)); - } - - Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool()); - BulkResponse response = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet(); - assertFalse(response.buildFailureMessage(), response.hasFailures()); - client().admin().indices().prepareRefresh("source").get(); - } - @After public void forceUnblockAllExecutors() { for (CyclicBarrier barrier: blockedExecutors) { @@ -85,8 +70,15 @@ public class RetryTests extends ESSingleNodeTestCase { } @Override - protected Collection> getPlugins() { - return pluginList( + protected Collection> nodePlugins() { + return Arrays.asList( + ReindexPlugin.class, + Netty4Plugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList( ReindexPlugin.class, Netty4Plugin.class); } @@ -95,63 +87,123 @@ public class RetryTests extends ESSingleNodeTestCase { * Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried. */ @Override - protected Settings nodeSettings() { - Settings.Builder settings = Settings.builder().put(super.nodeSettings()); - // Use pools of size 1 so we can block them - settings.put("thread_pool.bulk.size", 1); - settings.put("thread_pool.search.size", 1); - // Use queues of size 1 because size 0 is broken and because search requests need the queue to function - settings.put("thread_pool.bulk.queue_size", 1); - settings.put("thread_pool.search.queue_size", 1); - // Enable http so we can test retries on reindex from remote. In this case the "remote" cluster is just this cluster. - settings.put(NetworkModule.HTTP_ENABLED.getKey(), true); - // Whitelist reindexing from the http host we're going to use - settings.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "127.0.0.1:*"); - return settings.build(); + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nodeSettings()).build(); + } + + final Settings nodeSettings() { + return Settings.builder() + // enable HTTP so we can test retries on reindex from remote; in this case the "remote" cluster is just this cluster + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + // whitelist reindexing from the HTTP host we're going to use + .put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "127.0.0.1:*") + .build(); } public void testReindex() throws Exception { - testCase(ReindexAction.NAME, ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest"), + testCase( + ReindexAction.NAME, + client -> ReindexAction.INSTANCE.newRequestBuilder(client).source("source").destination("dest"), matcher().created(DOC_COUNT)); } public void testReindexFromRemote() throws Exception { - NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0); - TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress(); - RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, - null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); - ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") - .setRemoteInfo(remote); - testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT)); + Function> function = client -> { + /* + * Use the master node for the reindex from remote because that node + * doesn't have a copy of the data on it. + */ + NodeInfo masterNode = null; + for (NodeInfo candidate : client.admin().cluster().prepareNodesInfo().get().getNodes()) { + if (candidate.getNode().isMasterNode()) { + masterNode = candidate; + } + } + assertNotNull(masterNode); + + TransportAddress address = masterNode.getHttp().getAddress().publishAddress(); + RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, + null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); + ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client).source("source").destination("dest") + .setRemoteInfo(remote); + return request; + }; + testCase(ReindexAction.NAME, function, matcher().created(DOC_COUNT)); } public void testUpdateByQuery() throws Exception { - testCase(UpdateByQueryAction.NAME, UpdateByQueryAction.INSTANCE.newRequestBuilder(client()).source("source"), + testCase(UpdateByQueryAction.NAME, client -> UpdateByQueryAction.INSTANCE.newRequestBuilder(client).source("source"), matcher().updated(DOC_COUNT)); } public void testDeleteByQuery() throws Exception { - testCase(DeleteByQueryAction.NAME, DeleteByQueryAction.INSTANCE.newRequestBuilder(client()).source("source") + testCase(DeleteByQueryAction.NAME, client -> DeleteByQueryAction.INSTANCE.newRequestBuilder(client).source("source") .filter(QueryBuilders.matchAllQuery()), matcher().deleted(DOC_COUNT)); } - private void testCase(String action, AbstractBulkByScrollRequestBuilder request, BulkIndexByScrollResponseMatcher matcher) + private void testCase( + String action, + Function> request, + BulkIndexByScrollResponseMatcher matcher) throws Exception { - logger.info("Blocking search"); - CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH); + /* + * These test cases work by stuffing the search and bulk queues of a single node and + * making sure that we read and write from that node. Because of some "fun" with the + * way that searches work, we need at least one more node to act as the coordinating + * node for the search request. If we didn't do this then the searches would get stuck + * in the queue anyway because we force queue portions of the coordinating node's + * actions. This is not a big deal in normal operations but a real pain when you are + * intentionally stuffing queues hoping for a failure. + */ + final Settings nodeSettings = Settings.builder() + // use pools of size 1 so we can block them + .put("thread_pool.bulk.size", 1) + .put("thread_pool.search.size", 1) + // use queues of size 1 because size 0 is broken and because search requests need the queue to function + .put("thread_pool.bulk.queue_size", 1) + .put("thread_pool.search.queue_size", 1) + .put("node.attr.color", "blue") + .build(); + final String node = internalCluster().startDataOnlyNode(nodeSettings); + final Settings indexSettings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.include.color", "blue") + .build(); + + // Create the source index on the node with small thread pools so we can block them. + client().admin().indices().prepareCreate("source").setSettings(indexSettings).execute().actionGet(); + // Not all test cases use the dest index but those that do require that it be on the node will small thread pools + client().admin().indices().prepareCreate("dest").setSettings(indexSettings).execute().actionGet(); + // Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools. + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < DOC_COUNT; i++) { + bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i)); + } + + Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool()); + BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet(); + assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures()); + client().admin().indices().prepareRefresh("source").get(); + + logger.info("Blocking search"); + CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH, node); + + AbstractBulkByScrollRequestBuilder builder = request.apply(internalCluster().masterClient()); // Make sure we use more than one batch so we have to scroll - request.source().setSize(DOC_COUNT / randomIntBetween(2, 10)); + builder.source().setSize(DOC_COUNT / randomIntBetween(2, 10)); logger.info("Starting request"); - ActionFuture responseListener = request.execute(); + ActionFuture responseListener = builder.execute(); try { logger.info("Waiting for search rejections on the initial search"); assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L))); logger.info("Blocking bulk and unblocking search so we start to get bulk rejections"); - CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK); + CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.BULK, node); initialSearchBlock.await(); logger.info("Waiting for bulk rejections"); @@ -161,7 +213,7 @@ public class RetryTests extends ESSingleNodeTestCase { long initialSearchRejections = taskStatus(action).getSearchRetries(); logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll"); - CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH); + CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH, node); bulkBlock.await(); logger.info("Waiting for search rejections for the scroll"); @@ -187,8 +239,8 @@ public class RetryTests extends ESSingleNodeTestCase { * Blocks the named executor by getting its only thread running a task blocked on a CyclicBarrier and fills the queue with a noop task. * So requests to use this queue should get {@link EsRejectedExecutionException}s. */ - private CyclicBarrier blockExecutor(String name) throws Exception { - ThreadPool threadPool = getInstanceFromNode(ThreadPool.class); + private CyclicBarrier blockExecutor(String name, String node) throws Exception { + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, node); CyclicBarrier barrier = new CyclicBarrier(2); logger.info("Blocking the [{}] executor", name); threadPool.executor(name).execute(() -> { @@ -211,6 +263,11 @@ public class RetryTests extends ESSingleNodeTestCase { * Fetch the status for a task of type "action". Fails if there aren't exactly one of that type of task running. */ private BulkByScrollTask.Status taskStatus(String action) { + /* + * We always use the master client because we always start the test requests on the + * master. We do this simply to make sure that the test request is not started on the + * node who's queue we're manipulating. + */ ListTasksResponse response = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get(); assertThat(response.getTasks(), hasSize(1)); return (BulkByScrollTask.Status) response.getTasks().get(0).getStatus();