This commit is contained in:
parent
05f1cd74a6
commit
37be695d5c
|
@ -245,20 +245,27 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
|
||||||
// stopping.
|
// stopping.
|
||||||
PersistentTasksCustomMetaData tasks = clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
PersistentTasksCustomMetaData tasks = clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||||
PersistentTasksCustomMetaData.PersistentTask<?> task = MlTasks.getDatafeedTask(datafeedId, tasks);
|
PersistentTasksCustomMetaData.PersistentTask<?> task = MlTasks.getDatafeedTask(datafeedId, tasks);
|
||||||
|
|
||||||
|
// It is possible that the datafeed has already detected the job failure and
|
||||||
|
// terminated itself. In this happens there is no persistent task to stop
|
||||||
|
assumeFalse("The datafeed task is null most likely because the datafeed detected the job had failed. " +
|
||||||
|
"This is expected to happen extremely rarely but the test cannot continue in these circumstances.", task == null);
|
||||||
|
|
||||||
UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest =
|
UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest =
|
||||||
new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING);
|
new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING);
|
||||||
PersistentTaskResponse updatePersistentTaskStatusResponse =
|
PersistentTaskResponse updatePersistentTaskStatusResponse =
|
||||||
client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet();
|
client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet();
|
||||||
assertNotNull(updatePersistentTaskStatusResponse.getTask());
|
assertNotNull(updatePersistentTaskStatusResponse.getTask());
|
||||||
|
|
||||||
// Confirm the datafeed state is now stopping - this may take a while to update in cluster state
|
// Confirm the datafeed state is now stopping - this may take a while to update in cluster state
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
|
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
|
||||||
GetDatafeedsStatsAction.Response datafeedStatsResponse =
|
GetDatafeedsStatsAction.Response datafeedStatsResponse =
|
||||||
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
|
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
|
||||||
assertEquals(DatafeedState.STOPPING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
|
assertEquals(DatafeedState.STOPPING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
// Stop the node running the failed job/stopping datafeed
|
// Stop the node running the failed job/stopping datafeed
|
||||||
ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index
|
ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index
|
||||||
internalCluster().stopRandomNode(settings -> jobNode.getName().equals(settings.get("node.name")));
|
internalCluster().stopRandomNode(settings -> jobNode.getName().equals(settings.get("node.name")));
|
||||||
|
|
Loading…
Reference in New Issue