diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 670a2024cef..68cd883193d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -44,13 +44,13 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { public void testFailOver() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); - ensureStableCluster(3); + ensureStableClusterOnAllNodes(3); run(() -> { GetJobsStatsAction.Request request = new GetJobsStatsAction.Request("job_id"); GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); DiscoveryNode discoveryNode = response.getResponse().results().get(0).getNode(); internalCluster().stopRandomNode(settings -> discoveryNode.getName().equals(settings.get("node.name"))); - ensureStableCluster(2); + ensureStableClusterOnAllNodes(2); }); } @@ -67,7 +67,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { String mlAndDataNode = internalCluster().startNode(Settings.builder() .put("node.master", false) .build()); - ensureStableCluster(2); + ensureStableClusterOnAllNodes(2); run(() -> { logger.info("Stopping dedicated master node"); internalCluster().stopRandomNode(settings -> settings.getAsBoolean("node.master", false)); @@ -82,13 +82,13 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { .put("node.data", false) .put("node.ml", false) .build()); - ensureStableCluster(2); + ensureStableClusterOnAllNodes(2); }); } public void testFullClusterRestart() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); - ensureStableCluster(3); + ensureStableClusterOnAllNodes(3); run(() -> { logger.info("Restarting all nodes"); internalCluster().fullRestart(); @@ -131,6 +131,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { assertEquals(numDocs1, dataCounts.getProcessedRecordCount()); assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount()); }); + client().admin().indices().prepareSyncedFlush().get(); disrupt.run(); assertBusy(() -> { @@ -186,4 +187,10 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { } } + private void ensureStableClusterOnAllNodes(int nodeCount) { + for (String nodeName : internalCluster().getNodeNames()) { + ensureStableCluster(nodeCount, nodeName); + } + } + }