[ML] Fix close job when the process has not launched yet (elastic/x-pack-elasticsearch#2616)

If a job close is requested after a job was opened but before
its process was launched, the job close returns successfully
without doing anything. The result is that the process hangs
around. This has been causing test failures as documented
int elastic/x-pack-elasticsearch#2360 and elastic/x-pack-elasticsearch#1270.

This commit fixes this problem by refactoring the
AutodetectProcessManager. It introduces a state pattern
to make clear the states of the process and it uses locking
to ensure a close waits for the job process to be created.

relates elastic/x-pack-elasticsearch#1270

Original commit: elastic/x-pack-elasticsearch@ff858bd136
This commit is contained in:
Dimitris Athanasiou 2017-09-26 11:46:01 +01:00 committed by GitHub
parent a3984f7baa
commit c46e09902d
6 changed files with 295 additions and 88 deletions

View File

@ -15,7 +15,6 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
@ -56,7 +55,6 @@ public class AutodetectCommunicator implements Closeable {
private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1);
private final Job job;
private final JobTask jobTask;
private final AutodetectProcess autodetectProcess;
private final StateStreamer stateStreamer;
private final DataCountsReporter dataCountsReporter;
@ -66,12 +64,11 @@ public class AutodetectCommunicator implements Closeable {
private final NamedXContentRegistry xContentRegistry;
private volatile boolean processKilled;
AutodetectCommunicator(Job job, JobTask jobTask, AutodetectProcess process, StateStreamer stateStreamer,
AutodetectCommunicator(Job job, AutodetectProcess process, StateStreamer stateStreamer,
DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor,
Consumer<Exception> onFinishHandler, NamedXContentRegistry xContentRegistry,
ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.jobTask = jobTask;
this.autodetectProcess = process;
this.stateStreamer = stateStreamer;
this.dataCountsReporter = dataCountsReporter;
@ -261,10 +258,6 @@ public class AutodetectCommunicator implements Closeable {
}
}
public JobTask getJobTask() {
return jobTask;
}
public ZonedDateTime getProcessStartTime() {
return autodetectProcess.getProcessStartTime();
}

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
@ -50,7 +49,6 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -59,7 +57,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -105,8 +102,7 @@ public class AutodetectProcessManager extends AbstractComponent {
private final JobResultsPersister jobResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;
private final ConcurrentMap<Long, AutodetectCommunicator> autoDetectCommunicatorByOpenJob = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, AutodetectCommunicator> autoDetectCommunicatorByClosingJob = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, ProcessContext> processByAllocation = new ConcurrentHashMap<>();
private final int maxAllowedRunningJobs;
@ -134,53 +130,37 @@ public class AutodetectProcessManager extends AbstractComponent {
}
public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
int numJobs = autoDetectCommunicatorByOpenJob.size();
int numJobs = processByAllocation.size();
if (numJobs != 0) {
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
for (AutodetectCommunicator communicator : autoDetectCommunicatorByOpenJob.values()) {
closeJob(communicator.getJobTask(), false, reason);
for (ProcessContext process : processByAllocation.values()) {
closeJob(process.getJobTask(), false, reason);
}
}
}
public void killProcess(JobTask jobTask, boolean awaitCompletion, String reason) {
String extraInfo;
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.remove(jobTask.getAllocationId());
if (communicator == null) {
extraInfo = " while closing";
// if there isn't an open job, check for a closing job
communicator = autoDetectCommunicatorByClosingJob.remove(jobTask.getAllocationId());
} else {
extraInfo = "";
}
if (communicator != null) {
if (reason == null) {
logger.info("Killing job [{}]{}", jobTask.getJobId(), extraInfo);
} else {
logger.info("Killing job [{}]{}, because [{}]", jobTask.getJobId(), extraInfo, reason);
}
killProcess(communicator, jobTask.getJobId(), awaitCompletion, true);
ProcessContext processContext = processByAllocation.remove(jobTask.getAllocationId());
if (processContext != null) {
processContext.newKillBuilder()
.setAwaitCompletion(awaitCompletion)
.setFinish(true)
.setReason(reason)
.kill();
}
}
public void killAllProcessesOnThisNode() {
// first kill open jobs, then closing jobs
for (Iterator<AutodetectCommunicator> iter : Arrays.asList(autoDetectCommunicatorByOpenJob.values().iterator(),
autoDetectCommunicatorByClosingJob.values().iterator())) {
while (iter.hasNext()) {
AutodetectCommunicator communicator = iter.next();
iter.remove();
killProcess(communicator, communicator.getJobTask().getJobId(), false, false);
}
}
}
private void killProcess(AutodetectCommunicator communicator, String jobId, boolean awaitCompletion, boolean finish) {
try {
communicator.killProcess(awaitCompletion, finish);
} catch (IOException e) {
logger.error("[{}] Failed to kill autodetect process for job", jobId);
Iterator<ProcessContext> iterator = processByAllocation.values().iterator();
while (iterator.hasNext()) {
ProcessContext processContext = iterator.next();
processContext.newKillBuilder()
.setAwaitCompletion(false)
.setFinish(false)
.setSilent(true)
.kill();
iterator.remove();
}
}
@ -205,7 +185,7 @@ public class AutodetectProcessManager extends AbstractComponent {
*/
public void processData(JobTask jobTask, InputStream input, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open");
}
@ -223,7 +203,7 @@ public class AutodetectProcessManager extends AbstractComponent {
*/
public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<FlushAcknowledgement> handler) {
logger.debug("Flushing job {}", jobTask.getJobId());
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
logger.debug(message);
@ -250,7 +230,7 @@ public class AutodetectProcessManager extends AbstractComponent {
*/
public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Exception> handler) {
logger.debug("Forecasting job {}", jobTask.getJobId());
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId());
logger.debug(message);
@ -271,7 +251,7 @@ public class AutodetectProcessManager extends AbstractComponent {
public void writeUpdateProcessMessage(JobTask jobTask, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config,
Consumer<Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
logger.debug(message);
@ -298,6 +278,7 @@ public class AutodetectProcessManager extends AbstractComponent {
}
logger.info("Opening job [{}]", jobId);
processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask));
jobProvider.getAutodetectParams(job, params -> {
// We need to fork, otherwise we restore model state from a network thread (several GET api calls):
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@ -308,19 +289,29 @@ public class AutodetectProcessManager extends AbstractComponent {
@Override
protected void doRun() throws Exception {
ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId());
if (processContext == null) {
logger.debug("Aborted opening job [{}] as it has been closed", jobId);
return;
}
if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
logger.debug("Cannot open job [{}] when its state is [{}]", jobId, processContext.getState().getClass().getName());
return;
}
try {
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.computeIfAbsent(jobTask.getAllocationId(),
id -> create(jobTask, params, handler));
communicator.init(params.modelSnapshot());
createProcessAndSetRunning(processContext, params, handler);
processContext.getAutodetectCommunicator().init(params.modelSnapshot());
setJobState(jobTask, JobState.OPENED);
} catch (Exception e1) {
// No need to log here as the persistent task framework will log it
try {
// Don't leave a partially initialised process hanging around
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.remove(jobTask.getAllocationId());
if (communicator != null) {
communicator.killProcess(false, false);
}
processContext.newKillBuilder()
.setAwaitCompletion(false)
.setFinish(false)
.kill();
processByAllocation.remove(jobTask.getAllocationId());
} finally {
setJobState(jobTask, JobState.FAILED, e2 -> handler.accept(e1));
}
@ -333,13 +324,28 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
private void createProcessAndSetRunning(ProcessContext processContext, AutodetectParams params, Consumer<Exception> handler) {
try {
// At this point we lock the process context until the process has been started.
// The reason behind this is to ensure closing the job does not happen before
// the process is started as that can result to the job getting seemingly closed
// but the actual process is hanging alive.
processContext.tryLock();
AutodetectCommunicator communicator = create(processContext.getJobTask(), params, handler);
processContext.setRunning(communicator);
} finally {
// Now that the process is running and we have updated its state we can unlock.
// It is important to unlock before we initialize the communicator (ie. load the model state)
// as that may be a long-running method.
processContext.unlock();
}
}
AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer<Exception> handler) {
// Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME
// that an open job uses, so include them too when considering if enough threads are available.
// There's a slight possibility that the same key is in both sets, hence it's not sufficient to simply
// add the two map sizes.
int currentRunningJobs = Sets.union(autoDetectCommunicatorByOpenJob.keySet(), autoDetectCommunicatorByClosingJob.keySet()).size();
if (currentRunningJobs >= maxAllowedRunningJobs) {
int currentRunningJobs = processByAllocation.size();
if (currentRunningJobs > maxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
RestStatus.TOO_MANY_REQUESTS);
}
@ -390,7 +396,7 @@ public class AutodetectProcessManager extends AbstractComponent {
}
throw e;
}
return new AutodetectCommunicator(job, jobTask, process, new StateStreamer(client), dataCountsReporter, processor, handler,
return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler,
xContentRegistry, autodetectWorkerExecutor);
}
@ -429,31 +435,34 @@ public class AutodetectProcessManager extends AbstractComponent {
String jobId = jobTask.getJobId();
long allocationId = jobTask.getAllocationId();
logger.debug("Attempting to close job [{}], because [{}]", jobId, reason);
// don't remove the communicator immediately, because we need to ensure it's in the
// map of closing communicators before it's removed from the map of running ones
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(allocationId);
if (communicator == null) {
logger.debug("Cannot close: no active autodetect process for job [{}]", jobId);
return;
}
// keep a record of the job, so that it can still be killed while closing
autoDetectCommunicatorByClosingJob.putIfAbsent(allocationId, communicator);
communicator = autoDetectCommunicatorByOpenJob.remove(allocationId);
if (communicator == null) {
// if we get here a simultaneous close request beat us to the remove() call
logger.debug("Already closing autodetect process for job [{}]", jobId);
// don't remove the process context immediately, because we need to ensure
// it is reachable to enable killing a job while it is closing
ProcessContext processContext = processByAllocation.get(allocationId);
if (processContext == null) {
logger.debug("Cannot close job [{}] as it has already been closed", jobId);
return;
}
processContext.tryLock();
processContext.setDying();
processContext.unlock();
if (reason == null) {
logger.info("Closing job [{}]", jobId);
} else {
logger.info("Closing job [{}], because [{}]", jobId, reason);
}
AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
if (communicator == null) {
logger.debug("Job [{}] is being closed before its process is started", jobId);
jobTask.markAsCompleted();
return;
}
try {
communicator.close(restart, reason);
autoDetectCommunicatorByClosingJob.remove(allocationId);
processByAllocation.remove(allocationId);
} catch (Exception e) {
logger.warn("[" + jobId + "] Exception closing autodetect process", e);
setJobState(jobTask, JobState.FAILED);
@ -462,15 +471,29 @@ public class AutodetectProcessManager extends AbstractComponent {
}
int numberOfOpenJobs() {
return autoDetectCommunicatorByOpenJob.size();
return (int) processByAllocation.values().stream()
.filter(p -> p.getState() != ProcessContext.ProcessStateName.DYING)
.count();
}
boolean jobHasActiveAutodetectProcess(JobTask jobTask) {
return autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()) != null;
return getAutodetectCommunicator(jobTask) != null;
}
private AutodetectCommunicator getAutodetectCommunicator(JobTask jobTask) {
return processByAllocation.getOrDefault(jobTask.getAllocationId(), new ProcessContext(jobTask)).getAutodetectCommunicator();
}
private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) {
ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId());
if (processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
return processContext.getAutodetectCommunicator();
}
return null;
}
public Optional<Duration> jobOpenTime(JobTask jobTask) {
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());
AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask);
if (communicator == null) {
return Optional.empty();
}
@ -516,7 +539,7 @@ public class AutodetectProcessManager extends AbstractComponent {
}
public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(JobTask jobTask) {
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());
AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask);
if (communicator == null) {
return Optional.empty();
}
@ -597,6 +620,5 @@ public class AutodetectProcessManager extends AbstractComponent {
awaitTermination.countDown();
}
}
}
}

View File

@ -39,7 +39,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
private final Client client;
private final Environment env;

View File

@ -0,0 +1,195 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* The process context that encapsulates the job task, the process state and the autodetect communicator.
*/
final class ProcessContext {
private static final Logger LOGGER = Loggers.getLogger(ProcessContext.class);
private final ReentrantLock lock = new ReentrantLock();
private final JobTask jobTask;
private volatile AutodetectCommunicator autodetectCommunicator;
private volatile ProcessState state;
ProcessContext(JobTask jobTask) {
this.jobTask = jobTask;
this.state = new ProcessNotRunningState();
}
JobTask getJobTask() {
return jobTask;
}
AutodetectCommunicator getAutodetectCommunicator() {
return autodetectCommunicator;
}
private void setAutodetectCommunicator(AutodetectCommunicator autodetectCommunicator) {
this.autodetectCommunicator = autodetectCommunicator;
}
ProcessStateName getState() {
return state.getName();
}
private void setState(ProcessState state) {
this.state = state;
}
void tryLock() {
try {
if (lock.tryLock(NativeAutodetectProcessFactory.PROCESS_STARTUP_TIMEOUT.getSeconds(), TimeUnit.SECONDS) == false) {
LOGGER.error("Failed to acquire process lock for job [{}]", jobTask.getJobId());
throw ExceptionsHelper.serverError("Failed to acquire process lock for job [" + jobTask.getJobId() + "]");
}
} catch (InterruptedException e) {
throw new ElasticsearchException(e);
}
}
void unlock() {
lock.unlock();
}
void setRunning(AutodetectCommunicator autodetectCommunicator) {
assert lock.isHeldByCurrentThread();
state.setRunning(this, autodetectCommunicator);
}
void setDying() {
assert lock.isHeldByCurrentThread();
state.setDying(this);
}
KillBuilder newKillBuilder() {
return new ProcessContext.KillBuilder();
}
class KillBuilder {
private boolean awaitCompletion;
private boolean finish;
private boolean silent;
private String reason;
KillBuilder setAwaitCompletion(boolean awaitCompletion) {
this.awaitCompletion = awaitCompletion;
return this;
}
KillBuilder setFinish(boolean finish) {
this.finish = finish;
return this;
}
KillBuilder setSilent(boolean silent) {
this.silent = silent;
return this;
}
KillBuilder setReason(String reason) {
this.reason = reason;
return this;
}
void kill() {
if (autodetectCommunicator == null) {
return;
}
String jobId = jobTask.getJobId();
if (silent == false) {
String extraInfo = (state.getName() == ProcessStateName.DYING) ? " while closing" : "";
if (reason == null) {
LOGGER.info("Killing job [{}]{}", jobId, extraInfo);
} else {
LOGGER.info("Killing job [{}]{}, because [{}]", jobId, extraInfo, reason);
}
}
try {
autodetectCommunicator.killProcess(awaitCompletion, finish);
} catch (IOException e) {
LOGGER.error("[{}] Failed to kill autodetect process for job", jobId);
}
}
}
enum ProcessStateName {
NOT_RUNNING, RUNNING, DYING
}
private interface ProcessState {
void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator);
void setDying(ProcessContext processContext);
ProcessStateName getName();
}
private static class ProcessNotRunningState implements ProcessState {
@Override
public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) {
processContext.setAutodetectCommunicator(autodetectCommunicator);
processContext.setState(new ProcessRunningState());
}
@Override
public void setDying(ProcessContext processContext) {
processContext.setState(new ProcessDyingState());
}
@Override
public ProcessStateName getName() {
return ProcessStateName.NOT_RUNNING;
}
}
private static class ProcessRunningState implements ProcessState {
@Override
public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) {
LOGGER.debug("Process set to [running] while it was already in that state");
}
@Override
public void setDying(ProcessContext processContext) {
processContext.setState(new ProcessDyingState());
}
@Override
public ProcessStateName getName() {
return ProcessStateName.RUNNING;
}
}
private static class ProcessDyingState implements ProcessState {
@Override
public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) {
LOGGER.debug("Process set to [running] while it was in [dying]");
}
@Override
public void setDying(ProcessContext processContext) {
LOGGER.debug("Process set to [dying] while it was already in that state");
}
@Override
public ProcessStateName getName() {
return ProcessStateName.DYING;
}
}
}

View File

@ -198,9 +198,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
((ActionListener<Boolean>) invocation.getArguments()[0]).onResponse(true);
return null;
}).when(dataCountsReporter).finishReporting(any());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess, stateStreamer,
return new AutodetectCommunicator(createJobDetails(), autodetectProcess, stateStreamer,
dataCountsReporter, autoDetectResultProcessor, finishHandler,
new NamedXContentRegistry(Collections.emptyList()), executorService);
}

View File

@ -459,7 +459,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
when(communicator.getJobTask()).thenReturn(jobTask);
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),