diff --git a/docs/en/rest-api/ml/close-job.asciidoc b/docs/en/rest-api/ml/close-job.asciidoc index 947584ae651..9faf23e8bdc 100644 --- a/docs/en/rest-api/ml/close-job.asciidoc +++ b/docs/en/rest-api/ml/close-job.asciidoc @@ -30,10 +30,12 @@ are no longer required to process data. When a {dfeed} that has a specified end date stops, it automatically closes the job. -NOTE: If you use the `force` query parameter, the request returns before the -associated actions such as flushing buffers and persisting the model snapshots -complete. Therefore, do not use that parameter in a script that expects the job -to be in a consistent state after the close job API returns. +NOTE: If you use the `force` query parameter, the request returns without performing +the associated actions such as flushing buffers and persisting the model snapshots. +Therefore, do not use this parameter if you want the job to be in a consistent state +after the close job API returns. The `force` query parameter should only be used in +situations where the job has already failed, or where you are not interested in +results the job might have recently produced or might produce in the future. ==== Path Parameters diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index abfa0ba4c6a..dbe8a4cfbb1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -280,9 +280,9 @@ public class MachineLearning implements ActionPlugin { throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e); } } else { - autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, + autodetectProcessFactory = (job, modelSnapshot, quantiles, filters, ignoreDowntime, executorService, onProcessCrash) -> - new BlackHoleAutodetectProcess(); + new BlackHoleAutodetectProcess(job.getId()); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization, executorService) -> new MultiplyingNormalizerProcess(settings, 1.0); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/KillProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/KillProcessAction.java index b78aab9b122..a80196a4807 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/KillProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/KillProcessAction.java @@ -140,7 +140,7 @@ public class KillProcessAction extends Action results = new LinkedBlockingDeque<>(); private volatile boolean open = true; - public BlackHoleAutodetectProcess() { + public BlackHoleAutodetectProcess(String jobId) { + this.jobId = jobId; startTime = ZonedDateTime.now(); } @@ -74,7 +78,12 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { @Override public void close() throws IOException { - open = false; + if (open) { + Quantiles quantiles = new Quantiles(jobId, new Date(), "black hole quantiles"); + AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null); + results.add(result); + open = false; + } } @Override @@ -95,10 +104,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { while (open) { result = results.poll(100, TimeUnit.MILLISECONDS); if (result != null) { - break; + return true; } } - return open; + result = results.poll(); + return result != null; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 8f9b23b7809..d3a5fc3cd6b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -61,29 +61,17 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { ensureGreen(); OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet(); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); - }); + awaitJobOpenedAndAssigned(job.getId(), null); internalCluster().stopRandomDataNode(); ensureStableCluster(3); ensureGreen(); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); - }); + awaitJobOpenedAndAssigned(job.getId(), null); internalCluster().stopRandomDataNode(); ensureStableCluster(2); ensureGreen(); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); - }); + awaitJobOpenedAndAssigned(job.getId(), null); } public void testFailOverBasics_withDataFeeder() throws Exception { @@ -112,11 +100,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { ensureGreen(); OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet(); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); - }); + awaitJobOpenedAndAssigned(job.getId(), null); StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest); @@ -130,11 +114,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { internalCluster().stopRandomDataNode(); ensureStableCluster(3); ensureGreen(); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); - }); + awaitJobOpenedAndAssigned(job.getId(), null); assertBusy(() -> { GetDatafeedsStatsAction.Response statsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet(); @@ -145,11 +125,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { internalCluster().stopRandomDataNode(); ensureStableCluster(2); ensureGreen(); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); - }); + awaitJobOpenedAndAssigned(job.getId(), null); assertBusy(() -> { GetDatafeedsStatsAction.Response statsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet(); @@ -390,8 +366,10 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { Exception e = expectThrows(ElasticsearchStatusException.class, () -> client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet()); - assertTrue(e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation")); - assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-shared]]")); + assertTrue(e.getMessage(), + e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation")); + assertTrue(e.getMessage(), e.getMessage().endsWith("because not all primary shards are active for the following indices " + + "[.ml-state,.ml-anomalies-shared]]")); logger.info("Start data node"); String nonMlNode = internalCluster().startNode(Settings.builder() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java new file mode 100644 index 00000000000..b100d06db9f --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.FaultDetection; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.discovery.TestZenDiscovery; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xpack.ml.action.CloseJobAction; +import org.elasticsearch.xpack.ml.action.OpenJobAction; +import org.elasticsearch.xpack.ml.action.PutJobAction; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class NetworkDisruptionIT extends BaseMlIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false) + .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly + .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly + .put("discovery.zen.join_timeout", "10s") // still long to induce failures but not too long so test won't time out + .build(); + } + + @Override + protected Collection> nodePlugins() { + Collection> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(MockTransportService.TestPlugin.class); + return plugins; + } + + public void testJobRelocation() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(5); + ensureStableCluster(5); + + Job.Builder job = createJob("relocation-job"); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + assertTrue(putJobResponse.isAcknowledged()); + ensureGreen(); + + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); + OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet(); + assertTrue(openJobResponse.isAcknowledged()); + + // Record which node the job starts off on + String origJobNode = awaitJobOpenedAndAssigned(job.getId(), null); + + // Isolate the node the job is running on from the cluster + Set isolatedSide = Collections.singleton(origJobNode); + Set restOfClusterSide = new HashSet<>(Arrays.asList(internalCluster().getNodeNames())); + restOfClusterSide.remove(origJobNode); + String notIsolatedNode = restOfClusterSide.iterator().next(); + + NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(isolatedSide, restOfClusterSide), + new NetworkDisruption.NetworkDisconnect()); + internalCluster().setDisruptionScheme(networkDisruption); + networkDisruption.startDisrupting(); + ensureStableCluster(4, notIsolatedNode); + + // Job should move to a new node in the bigger portion of the cluster + String newJobNode = awaitJobOpenedAndAssigned(job.getId(), notIsolatedNode); + assertNotEquals(origJobNode, newJobNode); + + networkDisruption.removeAndEnsureHealthy(internalCluster()); + ensureGreen(); + + // Job should remain running on the new node, not the one that temporarily detached from the cluster + String finalJobNode = awaitJobOpenedAndAssigned(job.getId(), null); + assertEquals(newJobNode, finalJobNode); + + // The job running on the original node should have been killed, and hence should not have persisted quantiles + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()))) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet(); + assertEquals(0L, searchResponse.getHits().getTotalHits()); + + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(job.getId()); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + + // The relocated job was closed rather than killed, and hence should have persisted quantiles + searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()))) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet(); + assertEquals(1L, searchResponse.getHits().getTotalHits()); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java index c499ac25e13..2418fe185d3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcessTests.java @@ -15,7 +15,7 @@ import java.util.Iterator; public class BlackHoleAutodetectProcessTests extends ESTestCase { public void testFlushJob_writesAck() throws Exception { - try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess()) { + try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) { String flushId = process.flushJob(InterimResultsParams.builder().build()); Iterator iterator = process.readAutodetectResults(); iterator.hasNext(); @@ -24,4 +24,4 @@ public class BlackHoleAutodetectProcessTests extends ESTestCase { assertEquals(flushId, ack.getId()); } } -} \ No newline at end of file +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index d772b57fcf6..5c20426516f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -327,4 +328,16 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { } } + protected String awaitJobOpenedAndAssigned(String jobId, String queryNode) throws Exception { + AtomicReference jobNode = new AtomicReference<>(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client(queryNode).execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet(); + GetJobsStatsAction.Response.JobStats jobStats = statsResponse.getResponse().results().get(0); + assertEquals(JobState.OPENED, jobStats.getState()); + assertNotNull(jobStats.getNode()); + jobNode.set(jobStats.getNode().getName()); + }); + return jobNode.get(); + } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml index 22dfbc75c7f..20e2a6db445 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml @@ -517,3 +517,58 @@ count: index: .ml-state - match: {count: 0} + +--- +"Test force close does not create state": + + - do: + indices.create: + index: .ml-state + + - do: + xpack.ml.put_job: + job_id: index-layout-force-close-job + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "index-layout-force-close-job" } + + - do: + xpack.ml.open_job: + job_id: index-layout-force-close-job + + - do: + xpack.ml.post_data: + job_id: index-layout-force-close-job + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + + - do: + xpack.ml.close_job: + job_id: index-layout-force-close-job + force: true + - match: { closed: true } + + - do: + indices.exists: + index: ".ml-state" + - is_true: '' + + - do: + indices.refresh: {} + + - do: + count: + index: .ml-state + - match: {count: 0} +