[ML] Set jobs to FAILED state if C++ process dies unexpectedly (elastic/x-pack-elasticsearch#876)

Previously a `kill -9` on the `autodetect` process associated with a
job would leave the job in the OPENED state.

Now if the C++ process dies before a request to close the job is made
then the job state is set to FAILED.

For this purpose C++ process death is defined as end-of-file on the
log stream.  (Technically it would be possible to get end-of-file on
the log stream while the C++ process was still running, but this
would also represent an unexpected and undesirable situation.)

Original commit: elastic/x-pack-elasticsearch@2b74c56a79
This commit is contained in:
David Roberts 2017-03-29 16:23:58 +01:00 committed by GitHub
parent de061597fa
commit 979d232faa
8 changed files with 63 additions and 28 deletions

View File

@ -286,7 +286,8 @@ public class MachineLearning implements ActionPlugin {
throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e); throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e);
} }
} else { } else {
autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, ignoreDowntime, executorService) -> autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters,
ignoreDowntime, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess(); new BlackHoleAutodetectProcess();
// factor of 1.0 makes renormalization a no-op // factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization, normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,

View File

@ -27,8 +27,11 @@ public interface AutodetectProcessFactory {
* @param filters The filters to push to the native process * @param filters The filters to push to the native process
* @param ignoreDowntime Should gaps in data be treated as anomalous or as a maintenance window after a job re-start * @param ignoreDowntime Should gaps in data be treated as anomalous or as a maintenance window after a job re-start
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @param onProcessCrash Callback to execute if the process stops unexpectedly
* @return The process * @return The process
*/ */
AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters, AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters,
boolean ignoreDowntime, ExecutorService executorService); boolean ignoreDowntime,
ExecutorService executorService,
Runnable onProcessCrash);
} }

View File

@ -268,7 +268,8 @@ public class AutodetectProcessManager extends AbstractComponent {
threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization()); threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization());
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, executorService); autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime,
executorService, () -> setJobState(taskId, jobId, JobState.FAILED));
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor( AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());

View File

@ -29,6 +29,7 @@ import java.nio.file.Path;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -49,12 +50,16 @@ class NativeAutodetectProcess implements AutodetectProcess {
private final ZonedDateTime startTime; private final ZonedDateTime startTime;
private final int numberOfAnalysisFields; private final int numberOfAnalysisFields;
private final List<Path> filesToDelete; private final List<Path> filesToDelete;
private final Runnable onProcessCrash;
private volatile Future<?> logTailFuture; private volatile Future<?> logTailFuture;
private volatile Future<?> stateProcessorFuture; private volatile Future<?> stateProcessorFuture;
private volatile boolean processCloseInitiated;
private final AutodetectResultsParser resultsParser; private final AutodetectResultsParser resultsParser;
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream,
int numberOfAnalysisFields, List<Path> filesToDelete, AutodetectResultsParser resultsParser) { InputStream processOutStream, int numberOfAnalysisFields,
List<Path> filesToDelete, AutodetectResultsParser resultsParser,
Runnable onProcessCrash) {
this.jobId = jobId; this.jobId = jobId;
cppLogHandler = new CppLogMessageHandler(jobId, logStream); cppLogHandler = new CppLogMessageHandler(jobId, logStream);
this.processInStream = new BufferedOutputStream(processInStream); this.processInStream = new BufferedOutputStream(processInStream);
@ -64,6 +69,7 @@ class NativeAutodetectProcess implements AutodetectProcess {
this.numberOfAnalysisFields = numberOfAnalysisFields; this.numberOfAnalysisFields = numberOfAnalysisFields;
this.filesToDelete = filesToDelete; this.filesToDelete = filesToDelete;
this.resultsParser = resultsParser; this.resultsParser = resultsParser;
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
} }
public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) { public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) {
@ -71,7 +77,15 @@ class NativeAutodetectProcess implements AutodetectProcess {
try (CppLogMessageHandler h = cppLogHandler) { try (CppLogMessageHandler h = cppLogHandler) {
h.tailStream(); h.tailStream();
} catch (IOException e) { } catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e); LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs",
new Object[] { jobId }), e);
} finally {
if (processCloseInitiated == false) {
// The log message doesn't say "crashed", as the process could have been killed
// by a user or other process (e.g. the Linux OOM killer)
LOGGER.error("[{}] autodetect process stopped unexpectedly", jobId);
onProcessCrash.run();
}
} }
}); });
stateProcessorFuture = executorService.submit(() -> { stateProcessorFuture = executorService.submit(() -> {
@ -117,6 +131,7 @@ class NativeAutodetectProcess implements AutodetectProcess {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { try {
processCloseInitiated = true;
// closing its input causes the process to exit // closing its input causes the process to exit
processInStream.close(); processInStream.close();
// wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger

View File

@ -58,8 +58,11 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
} }
@Override @Override
public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters, public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot,
boolean ignoreDowntime, ExecutorService executorService) { Quantiles quantiles, Set<MlFilter> filters,
boolean ignoreDowntime,
ExecutorService executorService,
Runnable onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>(); List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.AUTODETECT, job.getId(), ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.AUTODETECT, job.getId(),
true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings)); true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
@ -70,7 +73,8 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
AutodetectResultsParser resultsParser = new AutodetectResultsParser(settings); AutodetectResultsParser resultsParser = new AutodetectResultsParser(settings);
NativeAutodetectProcess autodetect = new NativeAutodetectProcess( NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
processPipes.getProcessOutStream().get(), numberOfAnalysisFields, filesToDelete, resultsParser processPipes.getProcessOutStream().get(), numberOfAnalysisFields, filesToDelete,
resultsParser, onProcessCrash
); );
try { try {
autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());

View File

@ -37,6 +37,7 @@ class NativeNormalizerProcess implements NormalizerProcess {
private final OutputStream processInStream; private final OutputStream processInStream;
private final InputStream processOutStream; private final InputStream processOutStream;
private final LengthEncodedWriter recordWriter; private final LengthEncodedWriter recordWriter;
private volatile boolean processCloseInitiated;
private Future<?> logTailThread; private Future<?> logTailThread;
NativeNormalizerProcess(String jobId, Settings settings, InputStream logStream, OutputStream processInStream, NativeNormalizerProcess(String jobId, Settings settings, InputStream logStream, OutputStream processInStream,
@ -51,7 +52,14 @@ class NativeNormalizerProcess implements NormalizerProcess {
try (CppLogMessageHandler h = cppLogHandler) { try (CppLogMessageHandler h = cppLogHandler) {
h.tailStream(); h.tailStream();
} catch (IOException e) { } catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e); LOGGER.error(new ParameterizedMessage("[{}] Error tailing normalizer process logs",
new Object[] { jobId }), e);
} finally {
if (processCloseInitiated == false) {
// The log message doesn't say "crashed", as the process could have been killed
// by a user or other process (e.g. the Linux OOM killer)
LOGGER.error("[{}] normalizer process stopped unexpectedly", jobId);
}
} }
}); });
} }
@ -64,6 +72,7 @@ class NativeNormalizerProcess implements NormalizerProcess {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { try {
processCloseInitiated = true;
// closing its input causes the process to exit // closing its input causes the process to exit
processInStream.close(); processInStream.close();
// wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger

View File

@ -135,7 +135,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
when(autodetectProcess.isProcessAlive()).thenReturn(true); when(autodetectProcess.isProcessAlive()).thenReturn(true);
when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator()); when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator());
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
Settings.Builder settings = Settings.builder(); Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
@ -297,7 +298,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, persistentTasksService, normalizerFactory, persistentTasksService,

View File

@ -16,7 +16,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -51,11 +50,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
} }
public void testProcessStartTime() throws Exception { public void testProcessStartTime() throws Exception {
InputStream logStream = Mockito.mock(InputStream.class); InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class), mock(OutputStream.class), mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, null, new AutodetectResultsParser(Settings.EMPTY))) { NUMBER_ANALYSIS_FIELDS, null,
new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
ZonedDateTime startTime = process.getProcessStartTime(); ZonedDateTime startTime = process.getProcessStartTime();
@ -69,13 +69,13 @@ public class NativeAutodetectProcessTests extends ESTestCase {
} }
public void testWriteRecord() throws IOException { public void testWriteRecord() throws IOException {
InputStream logStream = Mockito.mock(InputStream.class); InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
String[] record = {"r1", "r2", "r3", "r4", "r5"}; String[] record = {"r1", "r2", "r3", "r4", "r5"};
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(Settings.EMPTY))) { new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
process.writeRecord(record); process.writeRecord(record);
@ -102,12 +102,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
} }
public void testFlush() throws IOException { public void testFlush() throws IOException {
InputStream logStream = Mockito.mock(InputStream.class); InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(Settings.EMPTY))) { new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
InterimResultsParams params = InterimResultsParams.builder().build(); InterimResultsParams params = InterimResultsParams.builder().build();
@ -119,12 +119,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
} }
public void testWriteResetBucketsControlMessage() throws IOException { public void testWriteResetBucketsControlMessage() throws IOException {
InputStream logStream = Mockito.mock(InputStream.class); InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(Settings.EMPTY))) { new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty()); DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty());
@ -137,12 +137,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
} }
public void testWriteUpdateConfigMessage() throws IOException { public void testWriteUpdateConfigMessage() throws IOException {
InputStream logStream = Mockito.mock(InputStream.class); InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1); when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(Settings.EMPTY))) { new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) {
process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(StateProcessor.class), mock(InputStream.class));
process.writeUpdateModelPlotMessage(new ModelPlotConfig()); process.writeUpdateModelPlotMessage(new ModelPlotConfig());