[ML] Fix a race condition simultaneous close requests are made for a job (elastic/x-pack-elasticsearch#2913)

When simultaneous close requests were made for the same job it was possible
that one of the requests would inappropriately log error messages about the
job having failed.  This change prevents that problem, whilst continuing to
adhere to the requirement that close requests for already closing jobs do not
return until the close request that is doing the work completes.

relates elastic/x-pack-elasticsearch#2912

Original commit: elastic/x-pack-elasticsearch@513b7fa1d6
This commit is contained in:
David Roberts 2017-11-07 14:30:59 +00:00 committed by GitHub
parent 8e5855e62e
commit 027f64b221
3 changed files with 92 additions and 31 deletions

View File

@ -325,12 +325,12 @@ public class AutodetectProcessManager extends AbstractComponent {
} }
private void createProcessAndSetRunning(ProcessContext processContext, AutodetectParams params, Consumer<Exception> handler) { private void createProcessAndSetRunning(ProcessContext processContext, AutodetectParams params, Consumer<Exception> handler) {
// At this point we lock the process context until the process has been started.
// The reason behind this is to ensure closing the job does not happen before
// the process is started as that can result to the job getting seemingly closed
// but the actual process is hanging alive.
processContext.tryLock();
try { try {
// At this point we lock the process context until the process has been started.
// The reason behind this is to ensure closing the job does not happen before
// the process is started as that can result to the job getting seemingly closed
// but the actual process is hanging alive.
processContext.tryLock();
AutodetectCommunicator communicator = create(processContext.getJobTask(), params, handler); AutodetectCommunicator communicator = create(processContext.getJobTask(), params, handler);
processContext.setRunning(communicator); processContext.setRunning(communicator);
} finally { } finally {
@ -444,29 +444,36 @@ public class AutodetectProcessManager extends AbstractComponent {
} }
processContext.tryLock(); processContext.tryLock();
processContext.setDying();
processContext.unlock();
if (reason == null) {
logger.info("Closing job [{}]", jobId);
} else {
logger.info("Closing job [{}], because [{}]", jobId, reason);
}
AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
if (communicator == null) {
logger.debug("Job [{}] is being closed before its process is started", jobId);
jobTask.markAsCompleted();
return;
}
try { try {
if (processContext.setDying() == false) {
logger.debug("Cannot close job [{}] as it has already been closed", jobId);
return;
}
if (reason == null) {
logger.info("Closing job [{}]", jobId);
} else {
logger.info("Closing job [{}], because [{}]", jobId, reason);
}
AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
if (communicator == null) {
logger.debug("Job [{}] is being closed before its process is started", jobId);
jobTask.markAsCompleted();
return;
}
communicator.close(restart, reason); communicator.close(restart, reason);
processByAllocation.remove(allocationId); processByAllocation.remove(allocationId);
} catch (Exception e) { } catch (Exception e) {
logger.warn("[" + jobId + "] Exception closing autodetect process", e); logger.warn("[" + jobId + "] Exception closing autodetect process", e);
setJobState(jobTask, JobState.FAILED); setJobState(jobTask, JobState.FAILED);
throw ExceptionsHelper.serverError("Exception closing autodetect process", e); throw ExceptionsHelper.serverError("Exception closing autodetect process", e);
} finally {
// to ensure the contract that multiple simultaneous close calls for the same job wait until
// the job is closed is honoured, hold the lock throughout the close procedure so that another
// thread that gets into this method blocks until the first thread has finished closing the job
processContext.unlock();
} }
} }

View File

@ -72,9 +72,9 @@ final class ProcessContext {
state.setRunning(this, autodetectCommunicator); state.setRunning(this, autodetectCommunicator);
} }
void setDying() { boolean setDying() {
assert lock.isHeldByCurrentThread(); assert lock.isHeldByCurrentThread();
state.setDying(this); return state.setDying(this);
} }
KillBuilder newKillBuilder() { KillBuilder newKillBuilder() {
@ -134,21 +134,29 @@ final class ProcessContext {
} }
private interface ProcessState { private interface ProcessState {
void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator); /**
void setDying(ProcessContext processContext); * @return was a state change made?
* */
boolean setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator);
/**
* @return was a state change made?
*/
boolean setDying(ProcessContext processContext);
ProcessStateName getName(); ProcessStateName getName();
} }
private static class ProcessNotRunningState implements ProcessState { private static class ProcessNotRunningState implements ProcessState {
@Override @Override
public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { public boolean setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) {
processContext.setAutodetectCommunicator(autodetectCommunicator); processContext.setAutodetectCommunicator(autodetectCommunicator);
processContext.setState(new ProcessRunningState()); processContext.setState(new ProcessRunningState());
return true;
} }
@Override @Override
public void setDying(ProcessContext processContext) { public boolean setDying(ProcessContext processContext) {
processContext.setState(new ProcessDyingState()); processContext.setState(new ProcessDyingState());
return true;
} }
@Override @Override
@ -160,13 +168,15 @@ final class ProcessContext {
private static class ProcessRunningState implements ProcessState { private static class ProcessRunningState implements ProcessState {
@Override @Override
public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { public boolean setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) {
LOGGER.debug("Process set to [running] while it was already in that state"); LOGGER.debug("Process set to [running] while it was already in that state");
return false;
} }
@Override @Override
public void setDying(ProcessContext processContext) { public boolean setDying(ProcessContext processContext) {
processContext.setState(new ProcessDyingState()); processContext.setState(new ProcessDyingState());
return true;
} }
@Override @Override
@ -178,13 +188,15 @@ final class ProcessContext {
private static class ProcessDyingState implements ProcessState { private static class ProcessDyingState implements ProcessState {
@Override @Override
public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { public boolean setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) {
LOGGER.debug("Process set to [running] while it was in [dying]"); LOGGER.debug("Process set to [running] while it was in [dying]");
return false;
} }
@Override @Override
public void setDying(ProcessContext processContext) { public boolean setDying(ProcessContext processContext) {
LOGGER.debug("Process set to [dying] while it was already in that state"); LOGGER.debug("Process set to [dying] while it was already in that state");
return false;
} }
@Override @Override

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask; import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
@ -60,6 +61,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -308,6 +310,46 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals(0, manager.numberOfOpenJobs()); assertEquals(0, manager.numberOfOpenJobs());
} }
// DEBUG logging makes it possible to see exactly how the two threads
// interleaved in the AutodetectProcessManager.close() call
@TestLogging("org.elasticsearch.xpack.ml.job.process.autodetect:DEBUG")
public void testCanCloseClosingJob() throws Exception {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AtomicInteger numberOfCommunicatorCloses = new AtomicInteger(0);
doAnswer(invocationOnMock -> {
numberOfCommunicatorCloses.incrementAndGet();
// This increases the chance of the two threads both getting into
// the middle of the AutodetectProcessManager.close() method
Thread.yield();
return null;
}).when(communicator).close(anyBoolean(), anyString());
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfOpenJobs());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
assertEquals(1, manager.numberOfOpenJobs());
// Close the job in a separate thread
Thread closeThread = new Thread(() -> manager.closeJob(jobTask, false, "in separate thread"));
closeThread.start();
Thread.yield();
// Also close the job in the current thread, so that we have two simultaneous close requests
manager.closeJob(jobTask, false, "in main test thread");
closeThread.join(500);
assertFalse(closeThread.isAlive());
// Only one of the threads should have called AutodetectCommunicator.close()
assertEquals(1, numberOfCommunicatorCloses.get());
assertEquals(0, manager.numberOfOpenJobs());
}
public void testCanKillClosingJob() throws Exception { public void testCanKillClosingJob() throws Exception {
CountDownLatch closeStartedLatch = new CountDownLatch(1); CountDownLatch closeStartedLatch = new CountDownLatch(1);
CountDownLatch killLatch = new CountDownLatch(1); CountDownLatch killLatch = new CountDownLatch(1);