[TEST] test didn't take into account other cluster service tasks
The pending tests on an actual node should take into account that other tasks might be executing on that node, thus failing when it happens
This commit is contained in:
parent
045ce097c9
commit
6e99448620
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.cluster;
|
package org.elasticsearch.cluster;
|
||||||
|
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
|
@ -404,14 +405,15 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@Repeat(iterations = 1000)
|
||||||
public void testPendingUpdateTask() throws Exception {
|
public void testPendingUpdateTask() throws Exception {
|
||||||
Settings zenSettings = settingsBuilder()
|
Settings settings = settingsBuilder()
|
||||||
.put("discovery.type", "zen").build();
|
.put("discovery.type", "local")
|
||||||
String node_0 = internalCluster().startNode(zenSettings);
|
.build();
|
||||||
internalCluster().startNodeClient(zenSettings);
|
String node_0 = internalCluster().startNode(settings);
|
||||||
|
internalCluster().startNodeClient(settings);
|
||||||
|
|
||||||
|
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0);
|
||||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0);
|
|
||||||
final CountDownLatch block1 = new CountDownLatch(1);
|
final CountDownLatch block1 = new CountDownLatch(1);
|
||||||
final CountDownLatch invoked1 = new CountDownLatch(1);
|
final CountDownLatch invoked1 = new CountDownLatch(1);
|
||||||
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
|
||||||
|
@ -453,33 +455,39 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// there might be other tasks in this node, make sure to only take the ones we add into account in this test
|
||||||
|
|
||||||
// The tasks can be re-ordered, so we need to check out-of-order
|
// The tasks can be re-ordered, so we need to check out-of-order
|
||||||
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
|
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
|
||||||
List<PendingClusterTask> pendingClusterTasks = clusterService.pendingTasks();
|
List<PendingClusterTask> pendingClusterTasks = clusterService.pendingTasks();
|
||||||
assertThat(pendingClusterTasks.size(), equalTo(10));
|
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
|
||||||
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
|
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
|
||||||
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
|
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
|
||||||
for (PendingClusterTask task : pendingClusterTasks) {
|
for (PendingClusterTask task : pendingClusterTasks) {
|
||||||
assertTrue(controlSources.remove(task.getSource().string()));
|
controlSources.remove(task.getSource().string());
|
||||||
}
|
}
|
||||||
assertTrue(controlSources.isEmpty());
|
assertTrue(controlSources.isEmpty());
|
||||||
|
|
||||||
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
|
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
|
||||||
PendingClusterTasksResponse response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
|
PendingClusterTasksResponse response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
|
||||||
assertThat(response.pendingTasks().size(), equalTo(10));
|
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
|
||||||
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
|
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
|
||||||
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
|
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
|
||||||
for (PendingClusterTask task : response) {
|
for (PendingClusterTask task : response) {
|
||||||
assertTrue(controlSources.remove(task.getSource().string()));
|
controlSources.remove(task.getSource().string());
|
||||||
}
|
}
|
||||||
assertTrue(controlSources.isEmpty());
|
assertTrue(controlSources.isEmpty());
|
||||||
block1.countDown();
|
block1.countDown();
|
||||||
invoked2.await();
|
invoked2.await();
|
||||||
|
|
||||||
pendingClusterTasks = clusterService.pendingTasks();
|
// whenever we test for no tasks, we need to awaitBusy since this is a live node
|
||||||
assertThat(pendingClusterTasks, empty());
|
assertTrue(awaitBusy(new Predicate<Object>() {
|
||||||
response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
|
@Override
|
||||||
assertThat(response.pendingTasks(), empty());
|
public boolean apply(Object input) {
|
||||||
|
return clusterService.pendingTasks().isEmpty();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
waitNoPendingTasksOnAll();
|
||||||
|
|
||||||
final CountDownLatch block2 = new CountDownLatch(1);
|
final CountDownLatch block2 = new CountDownLatch(1);
|
||||||
final CountDownLatch invoked3 = new CountDownLatch(1);
|
final CountDownLatch invoked3 = new CountDownLatch(1);
|
||||||
|
@ -519,20 +527,21 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
pendingClusterTasks = clusterService.pendingTasks();
|
pendingClusterTasks = clusterService.pendingTasks();
|
||||||
assertThat(pendingClusterTasks.size(), equalTo(5));
|
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
|
||||||
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
|
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
|
||||||
for (PendingClusterTask task : pendingClusterTasks) {
|
for (PendingClusterTask task : pendingClusterTasks) {
|
||||||
assertTrue(controlSources.remove(task.getSource().string()));
|
controlSources.remove(task.getSource().string());
|
||||||
}
|
}
|
||||||
assertTrue(controlSources.isEmpty());
|
assertTrue(controlSources.isEmpty());
|
||||||
|
|
||||||
response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
|
response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().get();
|
||||||
assertThat(response.pendingTasks().size(), equalTo(5));
|
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
|
||||||
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
|
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
|
||||||
for (PendingClusterTask task : response) {
|
for (PendingClusterTask task : response) {
|
||||||
assertTrue(controlSources.remove(task.getSource().string()));
|
if (controlSources.remove(task.getSource().string())) {
|
||||||
assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
|
assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
assertTrue(controlSources.isEmpty());
|
assertTrue(controlSources.isEmpty());
|
||||||
block2.countDown();
|
block2.countDown();
|
||||||
}
|
}
|
||||||
|
@ -613,7 +622,7 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
|
||||||
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
|
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testPriorizedTasks() throws Exception {
|
public void testPrioritizedTasks() throws Exception {
|
||||||
Settings settings = settingsBuilder()
|
Settings settings = settingsBuilder()
|
||||||
.put("discovery.type", "local")
|
.put("discovery.type", "local")
|
||||||
.build();
|
.build();
|
||||||
|
|
Loading…
Reference in New Issue