diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java index b020c3d6b88..b71acc9dfe6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterState; @@ -28,14 +29,15 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; -import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class IndexPrimaryRelocationIT extends ESIntegTestCase { @@ -54,7 +56,7 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase { Thread indexingThread = new Thread() { @Override public void run() { - while (finished.get() == false) { + while (finished.get() == false && numAutoGenDocs.get() < 10_000) { IndexResponse indexResponse = client().prepareIndex("test", "type", "id").setSource("field", "value").get(); assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); DeleteResponse deleteResponse = client().prepareDelete("test", "type", "id").get(); @@ -80,8 +82,18 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase { .add(new MoveAllocationCommand("test", 0, relocationSource.getId(), relocationTarget.getId())) .execute().actionGet(); ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(60)) .setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + if (clusterHealthResponse.isTimedOut()) { + final String hotThreads = client().admin().cluster().prepareNodesHotThreads().setIgnoreIdleThreads(false).get().getNodes() + .stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n")); + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + logger.info("timed out for waiting for relocation iteration [{}] \ncluster state {} \nhot threads {}", + i, clusterState, hotThreads); + finished.set(true); + indexingThread.join(); + throw new AssertionError("timed out waiting for relocation iteration [" + i + "] "); + } logger.info("--> [iteration {}] relocation complete", i); relocationSource = relocationTarget; // indexing process aborted early, no need for more relocations as test has already failed diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 369daef08d1..a448736ed55 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -78,7 +78,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -446,7 +445,7 @@ public class RelocationIT extends ESIntegTestCase { } } - public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException { + public void testIndexAndRelocateConcurrently() throws Exception { int halfNodes = randomIntBetween(1, 3); Settings[] nodeSettings = Stream.concat( Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), @@ -494,7 +493,7 @@ public class RelocationIT extends ESIntegTestCase { numDocs *= 2; logger.info(" --> waiting for relocation to complete"); - ensureGreen("test"); // move all shards to the new nodes (it waits on relocation) + ensureGreen(TimeValue.timeValueSeconds(60), "test"); // move all shards to the new nodes (it waits on relocation) final int numIters = randomIntBetween(10, 20); for (int i = 0; i < numIters; i++) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 035ce22094f..7ccc524b8b4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -918,10 +919,13 @@ public abstract class ESIntegTestCase extends ESTestCase { ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet(); if (actionGet.isTimedOut()) { - logger.info("{} timed out, cluster state:\n{}\n{}", + final String hotThreads = client().admin().cluster().prepareNodesHotThreads().setIgnoreIdleThreads(false).get().getNodes() + .stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n")); + logger.info("{} timed out, cluster state:\n{}\npending tasks:\n{}\nhot threads:\n{}\n", method, client().admin().cluster().prepareState().get().getState(), - client().admin().cluster().preparePendingClusterTasks().get()); + client().admin().cluster().preparePendingClusterTasks().get(), + hotThreads); fail("timed out waiting for " + color + " state"); } assertThat("Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(),