diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index d91edf23463..a4dcd382a2e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -245,20 +245,27 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { // stopping. PersistentTasksCustomMetaData tasks = clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData.PersistentTask task = MlTasks.getDatafeedTask(datafeedId, tasks); + + // It is possible that the datafeed has already detected the job failure and + // terminated itself. In this happens there is no persistent task to stop + assumeFalse("The datafeed task is null most likely because the datafeed detected the job had failed. " + + "This is expected to happen extremely rarely but the test cannot continue in these circumstances.", task == null); + UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest = - new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING); + new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING); PersistentTaskResponse updatePersistentTaskStatusResponse = - client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet(); + client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet(); assertNotNull(updatePersistentTaskStatusResponse.getTask()); // Confirm the datafeed state is now stopping - this may take a while to update in cluster state assertBusy(() -> { GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); GetDatafeedsStatsAction.Response datafeedStatsResponse = - client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); assertEquals(DatafeedState.STOPPING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); }); + // Stop the node running the failed job/stopping datafeed ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index internalCluster().stopRandomNode(settings -> jobNode.getName().equals(settings.get("node.name")));