[ML] Kill autodetect on force close and isolated node rejoining (elastic/x-pack-elasticsearch#1742)

Prior to this change, if the persistent tasks framework noticed that a
job was running on a node that was isolated but has rejoined the cluster
then it would close that job.  This was not ideal, because then the job
would persist state from the autodetect process that was isolated.  This
commit changes the behaviour to kill the autodetect process associated
with such a job, so that it does not interfere with the autodetect process
that is running on the node where the persistent tasks framework thinks it
should be running.

In order to achieve this a change has also been made to the behaviour of
force-close.  Previously this would result in the autodetect process being
gracefully shut down asynchronously to the force-close request.  However,
the mechanism by which this happened was the same as the mechanism for
cancelling tasks that end up running on more than one node due to nodes
becoming isolated from the cluster.  Therefore, force-close now also kills
the autodetect process rather than gracefully stopping it.  The documentation
has been changed to reflect this.  It should not be a problem as force-close
is supposed to be a last resort for when normal close fails.

relates elastic/x-pack-elasticsearch#1186

Original commit: elastic/x-pack-elasticsearch@578c944371
This commit is contained in:
David Roberts 2017-06-19 10:16:51 +01:00 committed by GitHub
parent 44c3c6b992
commit 03652e7497
11 changed files with 224 additions and 48 deletions

View File

@ -30,10 +30,12 @@ are no longer required to process data.
When a {dfeed} that has a specified end date stops, it automatically closes
the job.
NOTE: If you use the `force` query parameter, the request returns before the
associated actions such as flushing buffers and persisting the model snapshots
complete. Therefore, do not use that parameter in a script that expects the job
to be in a consistent state after the close job API returns.
NOTE: If you use the `force` query parameter, the request returns without performing
the associated actions such as flushing buffers and persisting the model snapshots.
Therefore, do not use this parameter if you want the job to be in a consistent state
after the close job API returns. The `force` query parameter should only be used in
situations where the job has already failed, or where you are not interested in
results the job might have recently produced or might produce in the future.
==== Path Parameters

View File

@ -280,9 +280,9 @@ public class MachineLearning implements ActionPlugin {
throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e);
}
} else {
autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters,
autodetectProcessFactory = (job, modelSnapshot, quantiles, filters,
ignoreDowntime, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess();
new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,
executorService) -> new MultiplyingNormalizerProcess(settings, 1.0);

View File

@ -140,7 +140,7 @@ public class KillProcessAction extends Action<KillProcessAction.Request, KillPro
auditor.info(jobTask.getJobId(), Messages.JOB_AUDIT_KILLING);
try {
processManager.killProcess(jobTask, true);
processManager.killProcess(jobTask, true, null);
listener.onResponse(new Response(true));
} catch (Exception e) {
listener.onFailure(e);

View File

@ -349,7 +349,11 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
protected void onCancelled() {
String reason = getReasonCancelled();
closeJob(reason);
killJob(reason);
}
void killJob(String reason) {
autodetectProcessManager.killProcess(this, false, reason);
}
void closeJob(String reason) {

View File

@ -133,9 +133,14 @@ public class AutodetectProcessManager extends AbstractComponent {
}
}
public void killProcess(JobTask jobTask, boolean awaitCompletion) {
public void killProcess(JobTask jobTask, boolean awaitCompletion, String reason) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobTask.getAllocationId());
if (communicator != null) {
if (reason == null) {
logger.info("Killing job [{}]", jobTask.getJobId());
} else {
logger.info("Killing job [{}], because [{}]", jobTask.getJobId(), reason);
}
killProcess(communicator, jobTask.getJobId(), awaitCompletion, true);
}
}

View File

@ -10,10 +10,12 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@ -31,11 +33,13 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
private static final String FLUSH_ID = "flush-1";
private final String jobId;
private final ZonedDateTime startTime;
private final BlockingQueue<AutodetectResult> results = new LinkedBlockingDeque<>();
private volatile boolean open = true;
public BlackHoleAutodetectProcess() {
public BlackHoleAutodetectProcess(String jobId) {
this.jobId = jobId;
startTime = ZonedDateTime.now();
}
@ -74,8 +78,13 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
@Override
public void close() throws IOException {
if (open) {
Quantiles quantiles = new Quantiles(jobId, new Date(), "black hole quantiles");
AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null);
results.add(result);
open = false;
}
}
@Override
public void kill() throws IOException {
@ -95,10 +104,11 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
while (open) {
result = results.poll(100, TimeUnit.MILLISECONDS);
if (result != null) {
break;
return true;
}
}
return open;
result = results.poll();
return result != null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;

View File

@ -61,29 +61,17 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
awaitJobOpenedAndAssigned(job.getId(), null);
internalCluster().stopRandomDataNode();
ensureStableCluster(3);
ensureGreen();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
awaitJobOpenedAndAssigned(job.getId(), null);
internalCluster().stopRandomDataNode();
ensureStableCluster(2);
ensureGreen();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
awaitJobOpenedAndAssigned(job.getId(), null);
}
public void testFailOverBasics_withDataFeeder() throws Exception {
@ -112,11 +100,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
awaitJobOpenedAndAssigned(job.getId(), null);
StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest);
@ -130,11 +114,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
internalCluster().stopRandomDataNode();
ensureStableCluster(3);
ensureGreen();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
awaitJobOpenedAndAssigned(job.getId(), null);
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet();
@ -145,11 +125,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
internalCluster().stopRandomDataNode();
ensureStableCluster(2);
ensureGreen();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
awaitJobOpenedAndAssigned(job.getId(), null);
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet();
@ -390,8 +366,10 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet());
assertTrue(e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation"));
assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-shared]]"));
assertTrue(e.getMessage(),
e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation"));
assertTrue(e.getMessage(), e.getMessage().endsWith("because not all primary shards are active for the following indices "
+ "[.ml-state,.ml-anomalies-shared]]"));
logger.info("Start data node");
String nonMlNode = internalCluster().startNode(Settings.builder()

View File

@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class NetworkDisruptionIT extends BaseMlIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but not too long so test won't time out
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockTransportService.TestPlugin.class);
return plugins;
}
public void testJobRelocation() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(5);
ensureStableCluster(5);
Job.Builder job = createJob("relocation-job");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
OpenJobAction.Response openJobResponse = client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertTrue(openJobResponse.isAcknowledged());
// Record which node the job starts off on
String origJobNode = awaitJobOpenedAndAssigned(job.getId(), null);
// Isolate the node the job is running on from the cluster
Set<String> isolatedSide = Collections.singleton(origJobNode);
Set<String> restOfClusterSide = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
restOfClusterSide.remove(origJobNode);
String notIsolatedNode = restOfClusterSide.iterator().next();
NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(isolatedSide, restOfClusterSide),
new NetworkDisruption.NetworkDisconnect());
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
ensureStableCluster(4, notIsolatedNode);
// Job should move to a new node in the bigger portion of the cluster
String newJobNode = awaitJobOpenedAndAssigned(job.getId(), notIsolatedNode);
assertNotEquals(origJobNode, newJobNode);
networkDisruption.removeAndEnsureHealthy(internalCluster());
ensureGreen();
// Job should remain running on the new node, not the one that temporarily detached from the cluster
String finalJobNode = awaitJobOpenedAndAssigned(job.getId(), null);
assertEquals(newJobNode, finalJobNode);
// The job running on the original node should have been killed, and hence should not have persisted quantiles
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId())))
.setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
assertEquals(0L, searchResponse.getHits().getTotalHits());
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(job.getId());
CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
assertTrue(closeJobResponse.isClosed());
// The relocated job was closed rather than killed, and hence should have persisted quantiles
searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId())))
.setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
assertEquals(1L, searchResponse.getHits().getTotalHits());
}
}

View File

@ -15,7 +15,7 @@ import java.util.Iterator;
public class BlackHoleAutodetectProcessTests extends ESTestCase {
public void testFlushJob_writesAck() throws Exception {
try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess()) {
try (BlackHoleAutodetectProcess process = new BlackHoleAutodetectProcess("foo")) {
String flushId = process.flushJob(InterimResultsParams.builder().build());
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
iterator.hasNext();

View File

@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
@ -327,4 +328,16 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
}
}
protected String awaitJobOpenedAndAssigned(String jobId, String queryNode) throws Exception {
AtomicReference<String> jobNode = new AtomicReference<>();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client(queryNode).execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet();
GetJobsStatsAction.Response.JobStats jobStats = statsResponse.getResponse().results().get(0);
assertEquals(JobState.OPENED, jobStats.getState());
assertNotNull(jobStats.getNode());
jobNode.set(jobStats.getNode().getName());
});
return jobNode.get();
}
}

View File

@ -517,3 +517,58 @@
count:
index: .ml-state
- match: {count: 0}
---
"Test force close does not create state":
- do:
indices.create:
index: .ml-state
- do:
xpack.ml.put_job:
job_id: index-layout-force-close-job
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: "index-layout-force-close-job" }
- do:
xpack.ml.open_job:
job_id: index-layout-force-close-job
- do:
xpack.ml.post_data:
job_id: index-layout-force-close-job
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
- do:
xpack.ml.close_job:
job_id: index-layout-force-close-job
force: true
- match: { closed: true }
- do:
indices.exists:
index: ".ml-state"
- is_true: ''
- do:
indices.refresh: {}
- do:
count:
index: .ml-state
- match: {count: 0}