diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java index 51fb027f8f9..5960fcdfbcc 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.google.common.base.Predicate; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -404,14 +405,15 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { } @Test + @Repeat(iterations = 1000) public void testPendingUpdateTask() throws Exception { - Settings zenSettings = settingsBuilder() - .put("discovery.type", "zen").build(); - String node_0 = internalCluster().startNode(zenSettings); - internalCluster().startNodeClient(zenSettings); + Settings settings = settingsBuilder() + .put("discovery.type", "local") + .build(); + String node_0 = internalCluster().startNode(settings); + internalCluster().startNodeClient(settings); - - ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0); + final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0); final CountDownLatch block1 = new CountDownLatch(1); final CountDownLatch invoked1 = new CountDownLatch(1); clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() { @@ -453,33 +455,39 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { }); } + // there might be other tasks in this node, make sure to only take the ones we add into account in this test + // The tasks can be re-ordered, so we need to check out-of-order Set controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")); List pendingClusterTasks = clusterService.pendingTasks(); - assertThat(pendingClusterTasks.size(), equalTo(10)); + assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10)); assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1")); assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true)); for (PendingClusterTask task : pendingClusterTasks) { - assertTrue(controlSources.remove(task.getSource().string())); + controlSources.remove(task.getSource().string()); } assertTrue(controlSources.isEmpty()); controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")); PendingClusterTasksResponse response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet(); - assertThat(response.pendingTasks().size(), equalTo(10)); + assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10)); assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1")); assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true)); for (PendingClusterTask task : response) { - assertTrue(controlSources.remove(task.getSource().string())); + controlSources.remove(task.getSource().string()); } assertTrue(controlSources.isEmpty()); block1.countDown(); invoked2.await(); - pendingClusterTasks = clusterService.pendingTasks(); - assertThat(pendingClusterTasks, empty()); - response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet(); - assertThat(response.pendingTasks(), empty()); + // whenever we test for no tasks, we need to awaitBusy since this is a live node + assertTrue(awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return clusterService.pendingTasks().isEmpty(); + } + })); + waitNoPendingTasksOnAll(); final CountDownLatch block2 = new CountDownLatch(1); final CountDownLatch invoked3 = new CountDownLatch(1); @@ -519,19 +527,20 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { Thread.sleep(100); pendingClusterTasks = clusterService.pendingTasks(); - assertThat(pendingClusterTasks.size(), equalTo(5)); + assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); for (PendingClusterTask task : pendingClusterTasks) { - assertTrue(controlSources.remove(task.getSource().string())); + controlSources.remove(task.getSource().string()); } assertTrue(controlSources.isEmpty()); - response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet(); - assertThat(response.pendingTasks().size(), equalTo(5)); + response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().get(); + assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5)); controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); for (PendingClusterTask task : response) { - assertTrue(controlSources.remove(task.getSource().string())); - assertThat(task.getTimeInQueueInMillis(), greaterThan(0l)); + if (controlSources.remove(task.getSource().string())) { + assertThat(task.getTimeInQueueInMillis(), greaterThan(0l)); + } } assertTrue(controlSources.isEmpty()); block2.countDown(); @@ -613,7 +622,7 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { * Note, this test can only work as long as we have a single thread executor executing the state update tasks! */ @Test - public void testPriorizedTasks() throws Exception { + public void testPrioritizedTasks() throws Exception { Settings settings = settingsBuilder() .put("discovery.type", "local") .build();