From 979d232faae754c9561763aced26c15acb51e5f9 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 29 Mar 2017 16:23:58 +0100 Subject: [PATCH] [ML] Set jobs to FAILED state if C++ process dies unexpectedly (elastic/x-pack-elasticsearch#876) Previously a `kill -9` on the `autodetect` process associated with a job would leave the job in the OPENED state. Now if the C++ process dies before a request to close the job is made then the job state is set to FAILED. For this purpose C++ process death is defined as end-of-file on the log stream. (Technically it would be possible to get end-of-file on the log stream while the C++ process was still running, but this would also represent an unexpected and undesirable situation.) Original commit: elastic/x-pack-elasticsearch@2b74c56a79140c859ad82c410a1365686cf48a26 --- .../xpack/ml/MachineLearning.java | 3 +- .../autodetect/AutodetectProcessFactory.java | 5 ++- .../autodetect/AutodetectProcessManager.java | 3 +- .../autodetect/NativeAutodetectProcess.java | 21 ++++++++++-- .../NativeAutodetectProcessFactory.java | 10 ++++-- .../normalizer/NativeNormalizerProcess.java | 11 ++++++- .../AutodetectProcessManagerTests.java | 6 ++-- .../NativeAutodetectProcessTests.java | 32 +++++++++---------- 8 files changed, 63 insertions(+), 28 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 7e1e22b2564..daec1dac9a3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -286,7 +286,8 @@ public class MachineLearning implements ActionPlugin { throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e); } } else { - autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, ignoreDowntime, executorService) -> + autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, + ignoreDowntime, executorService, onProcessCrash) -> new BlackHoleAutodetectProcess(); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java index 37d08506acb..30f8223008a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java @@ -27,8 +27,11 @@ public interface AutodetectProcessFactory { * @param filters The filters to push to the native process * @param ignoreDowntime Should gaps in data be treated as anomalous or as a maintenance window after a job re-start * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process + * @param onProcessCrash Callback to execute if the process stops unexpectedly * @return The process */ AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters, - boolean ignoreDowntime, ExecutorService executorService); + boolean ignoreDowntime, + ExecutorService executorService, + Runnable onProcessCrash); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 71c58974fd0..d0b6f6e8ef7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -268,7 +268,8 @@ public class AutodetectProcessManager extends AbstractComponent { threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization()); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), - autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, executorService); + autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, + executorService, () -> setJobState(taskId, jobId, JobState.FAILED)); boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 234de084161..d2b9c9c68f0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -29,6 +29,7 @@ import java.nio.file.Path; import java.time.ZonedDateTime; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -49,12 +50,16 @@ class NativeAutodetectProcess implements AutodetectProcess { private final ZonedDateTime startTime; private final int numberOfAnalysisFields; private final List filesToDelete; + private final Runnable onProcessCrash; private volatile Future logTailFuture; private volatile Future stateProcessorFuture; + private volatile boolean processCloseInitiated; private final AutodetectResultsParser resultsParser; - NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, - int numberOfAnalysisFields, List filesToDelete, AutodetectResultsParser resultsParser) { + NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, + InputStream processOutStream, int numberOfAnalysisFields, + List filesToDelete, AutodetectResultsParser resultsParser, + Runnable onProcessCrash) { this.jobId = jobId; cppLogHandler = new CppLogMessageHandler(jobId, logStream); this.processInStream = new BufferedOutputStream(processInStream); @@ -64,6 +69,7 @@ class NativeAutodetectProcess implements AutodetectProcess { this.numberOfAnalysisFields = numberOfAnalysisFields; this.filesToDelete = filesToDelete; this.resultsParser = resultsParser; + this.onProcessCrash = Objects.requireNonNull(onProcessCrash); } public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) { @@ -71,7 +77,15 @@ class NativeAutodetectProcess implements AutodetectProcess { try (CppLogMessageHandler h = cppLogHandler) { h.tailStream(); } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e); + LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs", + new Object[] { jobId }), e); + } finally { + if (processCloseInitiated == false) { + // The log message doesn't say "crashed", as the process could have been killed + // by a user or other process (e.g. the Linux OOM killer) + LOGGER.error("[{}] autodetect process stopped unexpectedly", jobId); + onProcessCrash.run(); + } } }); stateProcessorFuture = executorService.submit(() -> { @@ -117,6 +131,7 @@ class NativeAutodetectProcess implements AutodetectProcess { @Override public void close() throws IOException { try { + processCloseInitiated = true; // closing its input causes the process to exit processInStream.close(); // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index c91b628e14c..c9d8a0c8b15 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -58,8 +58,11 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory } @Override - public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters, - boolean ignoreDowntime, ExecutorService executorService) { + public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, + Quantiles quantiles, Set filters, + boolean ignoreDowntime, + ExecutorService executorService, + Runnable onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.AUTODETECT, job.getId(), true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings)); @@ -70,7 +73,8 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory AutodetectResultsParser resultsParser = new AutodetectResultsParser(settings); NativeAutodetectProcess autodetect = new NativeAutodetectProcess( job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), - processPipes.getProcessOutStream().get(), numberOfAnalysisFields, filesToDelete, resultsParser + processPipes.getProcessOutStream().get(), numberOfAnalysisFields, filesToDelete, + resultsParser, onProcessCrash ); try { autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java index 254d1a6cc51..f078957b4ec 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java @@ -37,6 +37,7 @@ class NativeNormalizerProcess implements NormalizerProcess { private final OutputStream processInStream; private final InputStream processOutStream; private final LengthEncodedWriter recordWriter; + private volatile boolean processCloseInitiated; private Future logTailThread; NativeNormalizerProcess(String jobId, Settings settings, InputStream logStream, OutputStream processInStream, @@ -51,7 +52,14 @@ class NativeNormalizerProcess implements NormalizerProcess { try (CppLogMessageHandler h = cppLogHandler) { h.tailStream(); } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e); + LOGGER.error(new ParameterizedMessage("[{}] Error tailing normalizer process logs", + new Object[] { jobId }), e); + } finally { + if (processCloseInitiated == false) { + // The log message doesn't say "crashed", as the process could have been killed + // by a user or other process (e.g. the Linux OOM killer) + LOGGER.error("[{}] normalizer process stopped unexpectedly", jobId); + } } }); } @@ -64,6 +72,7 @@ class NativeNormalizerProcess implements NormalizerProcess { @Override public void close() throws IOException { try { + processCloseInitiated = true; // closing its input causes the process to exit processInStream.close(); // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 9199557d285..49a02249365 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -135,7 +135,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); when(autodetectProcess.isProcessAlive()).thenReturn(true); when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator()); - AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; + AutodetectProcessFactory autodetectProcessFactory = + (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; Settings.Builder settings = Settings.builder(); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, @@ -297,7 +298,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); - AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; + AutodetectProcessFactory autodetectProcessFactory = + (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, persistentTasksService, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 6e7442400cc..94e55464bc6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.junit.Assert; import org.junit.Before; -import org.mockito.Mockito; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -51,11 +50,12 @@ public class NativeAutodetectProcessTests extends ESTestCase { } public void testProcessStartTime() throws Exception { - InputStream logStream = Mockito.mock(InputStream.class); + InputStream logStream = mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class), - NUMBER_ANALYSIS_FIELDS, null, new AutodetectResultsParser(Settings.EMPTY))) { + mock(OutputStream.class), mock(InputStream.class), + NUMBER_ANALYSIS_FIELDS, null, + new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); @@ -69,13 +69,13 @@ public class NativeAutodetectProcessTests extends ESTestCase { } public void testWriteRecord() throws IOException { - InputStream logStream = Mockito.mock(InputStream.class); + InputStream logStream = mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); String[] record = {"r1", "r2", "r3", "r4", "r5"}; ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(Settings.EMPTY))) { + bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.writeRecord(record); @@ -102,12 +102,12 @@ public class NativeAutodetectProcessTests extends ESTestCase { } public void testFlush() throws IOException { - InputStream logStream = Mockito.mock(InputStream.class); + InputStream logStream = mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(Settings.EMPTY))) { + bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); InterimResultsParams params = InterimResultsParams.builder().build(); @@ -119,12 +119,12 @@ public class NativeAutodetectProcessTests extends ESTestCase { } public void testWriteResetBucketsControlMessage() throws IOException { - InputStream logStream = Mockito.mock(InputStream.class); + InputStream logStream = mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(Settings.EMPTY))) { + bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty()); @@ -137,12 +137,12 @@ public class NativeAutodetectProcessTests extends ESTestCase { } public void testWriteUpdateConfigMessage() throws IOException { - InputStream logStream = Mockito.mock(InputStream.class); + InputStream logStream = mock(InputStream.class); when(logStream.read(new byte[1024])).thenReturn(-1); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, - bos, Mockito.mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(Settings.EMPTY))) { + bos, mock(InputStream.class), NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), + new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); process.writeUpdateModelPlotMessage(new ModelPlotConfig());