diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index c7a076deb73..5be6ffe2878 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -132,6 +132,7 @@ public final class MlTasks { public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getDatafeedTask(datafeedId, tasks); + // TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects if (task != null && task.getState() != null) { return (DatafeedState) task.getState(); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f367ccc4381..14ca0c3e6fa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -576,7 +576,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu } } else { autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) -> - new BlackHoleAutodetectProcess(job.getId()); + new BlackHoleAutodetectProcess(job.getId(), onProcessCrash); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); analyticsProcessFactory = (jobId, analyticsProcessConfig, state, executorService, onProcessCrash) -> null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 2d8c62223f2..3078715abf5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; @@ -34,6 +35,7 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -68,32 +70,46 @@ public class TransportStopDatafeedAction extends TransportTasksAction expandedDatafeedIds, + static void sortDatafeedIdsByTaskState(Collection expandedDatafeedIds, PersistentTasksCustomMetaData tasks, List startedDatafeedIds, - List stoppingDatafeedIds) { + List stoppingDatafeedIds, + List notStoppedDatafeedIds) { for (String expandedDatafeedId : expandedDatafeedIds) { addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks), - startedDatafeedIds, stoppingDatafeedIds); + startedDatafeedIds, stoppingDatafeedIds, notStoppedDatafeedIds); } } private static void addDatafeedTaskIdAccordingToState(String datafeedId, DatafeedState datafeedState, List startedDatafeedIds, - List stoppingDatafeedIds) { + List stoppingDatafeedIds, + List notStoppedDatafeedIds) { switch (datafeedState) { + case STARTING: + // The STARTING state is not used anywhere at present, so this should never happen. + // At present datafeeds that have a persistent task that hasn't yet been assigned + // a state are reported as STOPPED (which is not great). It could be considered a + // breaking change to introduce the STARTING state though, so let's aim to do it in + // version 8. Also consider treating STARTING like STARTED for stop API behaviour. + notStoppedDatafeedIds.add(datafeedId); + break; case STARTED: startedDatafeedIds.add(datafeedId); + notStoppedDatafeedIds.add(datafeedId); break; case STOPPED: break; case STOPPING: stoppingDatafeedIds.add(datafeedId); + notStoppedDatafeedIds.add(datafeedId); break; default: + assert false : "Unexpected datafeed state " + datafeedState; break; } } @@ -118,7 +134,8 @@ public class TransportStopDatafeedAction extends TransportTasksAction startedDatafeeds = new ArrayList<>(); List stoppingDatafeeds = new ArrayList<>(); - sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds); + List notStoppedDatafeeds = new ArrayList<>(); + sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds); if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) { listener.onResponse(new StopDatafeedAction.Response(true)); return; @@ -126,9 +143,9 @@ public class TransportStopDatafeedAction extends TransportTasksAction listener, - PersistentTasksCustomMetaData tasks, + PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes, List startedDatafeeds, List stoppingDatafeeds) { - Set executorNodes = new HashSet<>(); + final Set executorNodes = new HashSet<>(); for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); if (datafeedTask == null) { @@ -147,10 +164,10 @@ public class TransportStopDatafeedAction extends TransportTasksAction finalListener = ActionListener.wrap( r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener), - listener::onFailure); + e -> { + if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) { + // A node has dropped out of the cluster since we started executing the requests. + // Since stopping an already stopped datafeed is not an error we can try again. + // The datafeeds that were running on the node that dropped out of the cluster + // will just have their persistent tasks cancelled. Datafeeds that were stopped + // by the previous attempt will be noops in the subsequent attempt. + doExecute(task, request, listener); + } else { + listener.onFailure(e); + } + }); super.doExecute(task, request, finalListener); } private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener listener, - PersistentTasksCustomMetaData tasks, final List startedDatafeeds) { + PersistentTasksCustomMetaData tasks, final List notStoppedDatafeeds) { final AtomicInteger counter = new AtomicInteger(); - final AtomicArray failures = new AtomicArray<>(startedDatafeeds.size()); + final AtomicArray failures = new AtomicArray<>(notStoppedDatafeeds.size()); - for (String datafeedId : startedDatafeeds) { + for (String datafeedId : notStoppedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); if (datafeedTask != null) { persistentTasksService.sendRemoveRequest(datafeedTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - if (counter.incrementAndGet() == startedDatafeeds.size()) { + if (counter.incrementAndGet() == notStoppedDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } } @@ -196,23 +224,26 @@ public class TransportStopDatafeedAction extends TransportTasksAction results = new LinkedBlockingDeque<>(); + private final Consumer onProcessCrash; private volatile boolean open = true; - public BlackHoleAutodetectProcess(String jobId) { + public BlackHoleAutodetectProcess(String jobId, Consumer onProcessCrash) { this.jobId = jobId; startTime = ZonedDateTime.now(); + this.onProcessCrash = Objects.requireNonNull(onProcessCrash); } @Override @@ -59,7 +67,13 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { } @Override - public void writeRecord(String[] record) throws IOException { + public void writeRecord(String[] record) { + if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) { + open = false; + onProcessCrash.accept("simulated failure"); + AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null); + results.add(result); + } } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java index da390b61062..6ce29752108 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; public class TransportStopDatafeedActionTests extends ESTestCase { @@ -27,17 +26,21 @@ public class TransportStopDatafeedActionTests extends ESTestCase { List startedDatafeeds = new ArrayList<>(); List stoppingDatafeeds = new ArrayList<>(); + List notStoppedDatafeeds = new ArrayList<>(); TransportStopDatafeedAction.sortDatafeedIdsByTaskState( - Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds); + Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds); assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds); assertEquals(Collections.emptyList(), stoppingDatafeeds); + assertEquals(Collections.singletonList("datafeed_1"), notStoppedDatafeeds); startedDatafeeds.clear(); stoppingDatafeeds.clear(); + notStoppedDatafeeds.clear(); TransportStopDatafeedAction.sortDatafeedIdsByTaskState( - Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds); + Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds); assertEquals(Collections.emptyList(), startedDatafeeds); assertEquals(Collections.emptyList(), stoppingDatafeeds); + assertEquals(Collections.emptyList(), notStoppedDatafeeds); } public void testSortDatafeedIdsByTaskState_GivenAll() { @@ -50,15 +53,17 @@ public class TransportStopDatafeedActionTests extends ESTestCase { List startedDatafeeds = new ArrayList<>(); List stoppingDatafeeds = new ArrayList<>(); + List notStoppedDatafeeds = new ArrayList<>(); TransportStopDatafeedAction.sortDatafeedIdsByTaskState( - new HashSet<>(Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3")), tasks, startedDatafeeds, stoppingDatafeeds); + Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds); assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds); assertEquals(Collections.singletonList("datafeed_3"), stoppingDatafeeds); + assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), notStoppedDatafeeds); startedDatafeeds.clear(); stoppingDatafeeds.clear(); TransportStopDatafeedAction.sortDatafeedIdsByTaskState(Collections.singleton("datafeed_2"), tasks, startedDatafeeds, - stoppingDatafeeds); + stoppingDatafeeds, notStoppedDatafeeds); assertEquals(Collections.emptyList(), startedDatafeeds); assertEquals(Collections.emptyList(), stoppingDatafeeds); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 508de8f0c9f..4cb66b44b5f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -21,16 +23,20 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.persistent.PersistentTaskResponse; import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response.JobStats; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -41,6 +47,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.io.IOException; @@ -142,7 +149,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { String jobId = "test-lose-ml-node"; String datafeedId = jobId + "-datafeed"; - setupJobAndDatafeed(jobId, datafeedId); + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); waitForDatafeed(jobId, numDocs1); // stop the only ML node @@ -173,6 +180,158 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { assertTrue(closeJobResponse.isClosed()); } + public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("Starting master/data nodes..."); + for (int count = 0; count < 3; ++count) { + internalCluster().startNode(Settings.builder() + .put("node.master", true) + .put("node.data", true) + .put("node.ml", false) + .build()); + } + logger.info("Starting dedicated ml node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", false) + .put("node.data", false) + .put("node.ml", true) + .build()); + ensureStableClusterOnAllNodes(4); + + // index some datafeed data + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long weekAgo = now - 604800000; + long twoWeeksAgo = weekAgo - 604800000; + indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); + + String jobId = "test-stop-unassigned-datafeed-for-failed-job"; + String datafeedId = jobId + "-datafeed"; + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); + waitForDatafeed(jobId, numDocs1); + + // Job state should be opened here + GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.OPENED, jobStatsResponse.getResponse().results().get(0).getState()); + DiscoveryNode jobNode = jobStatsResponse.getResponse().results().get(0).getNode(); + + // Post the job a record that will result in the job receiving a timestamp in epoch + // seconds equal to the maximum integer - this makes the blackhole autodetect fail. + // It's better to do this than the approach of directly updating the job state using + // the approach used below for datafeeds, because when the job fails at the "process" + // level it sets off a more realistic chain reaction in the layers that wrap the "process" + // (remember it's not a real native process in these internal cluster tests). + PostDataAction.Request postDataRequest = new PostDataAction.Request(jobId); + postDataRequest.setContent( + new BytesArray("{ \"time\" : \"" + BlackHoleAutodetectProcess.MAGIC_FAILURE_VALUE_AS_DATE + "\" }"), XContentType.JSON); + PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).actionGet(); + assertEquals(1L, postDataResponse.getDataCounts().getInputRecordCount()); + + // Confirm the job state is now failed + jobStatsRequest = new GetJobsStatsAction.Request(jobId); + jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.FAILED, jobStatsResponse.getResponse().results().get(0).getState()); + + // It's impossible to reliably get the datafeed into a stopping state at the point when the ML node is removed from the cluster + // using externally accessible actions. The only way this situation could occur in reality is through extremely unfortunate + // timing. Therefore, to simulate this unfortunate timing we cheat and access internal classes to set the datafeed state to + // stopping. + PersistentTasksCustomMetaData tasks = clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask task = MlTasks.getDatafeedTask(datafeedId, tasks); + UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest = + new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING); + PersistentTaskResponse updatePersistentTaskStatusResponse = + client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet(); + assertNotNull(updatePersistentTaskStatusResponse.getTask()); + + // Confirm the datafeed state is now stopping + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // Stop the node running the failed job/stopping datafeed + ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index + internalCluster().stopRandomNode(settings -> jobNode.getName().equals(settings.get("node.name"))); + ensureStableCluster(3); + + // We should be allowed to force stop the unassigned datafeed even though it is stopping and its job has failed + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(true); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + // Confirm the datafeed state is now stopped + datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // We should be allowed to force stop the unassigned failed job + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + closeJobRequest.setForce(true); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + } + + public void testStopAndForceStopDatafeed() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("Starting dedicated master node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", true) + .put("node.data", true) + .put("node.ml", false) + .build()); + logger.info("Starting ml and data node..."); + internalCluster().startNode(Settings.builder() + .put("node.master", false) + .build()); + ensureStableClusterOnAllNodes(2); + + // index some datafeed data + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long weekAgo = now - 604800000; + long twoWeeksAgo = weekAgo - 604800000; + indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); + + String jobId = "test-stop-and-force-stop"; + String datafeedId = jobId + "-datafeed"; + setupJobAndDatafeed(jobId, datafeedId, TimeValue.timeValueHours(1)); + waitForDatafeed(jobId, numDocs1); + + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // Stop the datafeed normally + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + ActionFuture normalStopActionFuture + = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest); + + // Force stop the datafeed without waiting for the normal stop to return first + stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(true); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + // Confirm that the normal stop also reports success - whichever way the datafeed + // ends up getting stopped it's not an error to stop a stopped datafeed + stopDatafeedResponse = normalStopActionFuture.actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + } + public void testJobRelocationIsMemoryAware() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1); @@ -243,12 +402,12 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { }); } - private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception { + private void setupJobAndDatafeed(String jobId, String datafeedId, TimeValue datafeedFrequency) throws Exception { Job.Builder job = createScheduledJob(jobId); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); - DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data")); + DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"), datafeedFrequency); PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet(); @@ -273,7 +432,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { long twoWeeksAgo = weekAgo - 604800000; indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo); - setupJobAndDatafeed(jobId, "data_feed_id"); + setupJobAndDatafeed(jobId, "data_feed_id", TimeValue.timeValueSeconds(1)); waitForDatafeed(jobId, numDocs1); client().admin().indices().prepareSyncedFlush().get(); @@ -352,5 +511,4 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { ensureStableCluster(nodeCount, nodeName); } } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java index 4b11fd813ce..48439d6f685 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java @@ -11,17 +11,45 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class BlackHoleAutodetectProcessTests extends ESTestCase { public void testFlushJob_writesAck() throws Exception { - try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) { + try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo", failureReason -> {})) { String flushId = process.flushJob(FlushJobParams.builder().build()); Iterator iterator = process.readAutodetectResults(); - iterator.hasNext(); + assertTrue(iterator.hasNext()); AutodetectResult result = iterator.next(); FlushAcknowledgement ack = result.getFlushAcknowledgement(); assertEquals(flushId, ack.getId()); } } + + public void testSimulatedFailure() throws Exception { + AtomicReference failureReason = new AtomicReference<>(); + try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo", failureReason::set)) { + Iterator iterator = process.readAutodetectResults(); + process.writeRecord(new String[] { BlackHoleAutodetectProcess.MAGIC_FAILURE_VALUE}); + assertFalse(process.isProcessAlive()); + assertTrue(iterator.hasNext()); + AutodetectResult result = iterator.next(); + assertThat(result.getModelSizeStats(), nullValue()); + assertThat(result.getBucket(), nullValue()); + assertThat(result.getFlushAcknowledgement(), nullValue()); + assertThat(result.getCategoryDefinition(), nullValue()); + assertThat(result.getModelSnapshot(), nullValue()); + assertThat(result.getQuantiles(), nullValue()); + assertThat(result.getInfluencers(), nullValue()); + assertThat(result.getModelPlot(), nullValue()); + assertThat(result.getRecords(), nullValue()); + assertThat(result.getForecast(), nullValue()); + assertThat(result.getForecastRequestStats(), nullValue()); + assertFalse(iterator.hasNext()); + } + assertThat(failureReason.get(), equalTo("simulated failure")); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 68aa8d2aeec..c0414caf7f7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -196,13 +196,21 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { } public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List indices) { - return createDatafeedBuilder(datafeedId, jobId, indices).build(); + return createDatafeed(datafeedId, jobId, indices, TimeValue.timeValueSeconds(1)); + } + + public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List indices, TimeValue frequency) { + return createDatafeedBuilder(datafeedId, jobId, indices, frequency).build(); } public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices) { + return createDatafeedBuilder(datafeedId, jobId, indices, TimeValue.timeValueSeconds(1)); + } + + public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices, TimeValue frequency) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); builder.setQueryDelay(TimeValue.timeValueSeconds(1)); - builder.setFrequency(TimeValue.timeValueSeconds(1)); + builder.setFrequency(frequency); builder.setIndices(indices); return builder; }