From 2693c6a7300d447fb8c345cf86ebd6c665211e75 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 9 Nov 2017 15:47:23 +0100 Subject: [PATCH] [ML] improve logging for autodetect crashes (elastic/x-pack-elasticsearch#2866) Improving logging for unexpected autodetect termination (crash, oom). Output to the log pipe not conforming to the json log output format are treated as fatal error and logged, so that the crash as well as a proper error message if available gets logged. Original commit: elastic/x-pack-elasticsearch@ae5d792d3ff2d390899fc26bba2405a2654dcf6d --- .../process/autodetect/AutodetectProcess.java | 14 ++++++++++ .../BlackHoleAutodetectProcess.java | 10 +++++++ .../autodetect/NativeAutodetectProcess.java | 6 ++++ .../output/AutoDetectResultProcessor.java | 8 ++++-- .../process/logging/CppLogMessageHandler.java | 22 ++++++++++++--- .../AutoDetectResultProcessorTests.java | 2 ++ .../logging/CppLogMessageHandlerTests.java | 28 ++++++++++++++----- 7 files changed, 77 insertions(+), 13 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 7d4078a035f..a8008552d4b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -125,6 +125,20 @@ public interface AutodetectProcess extends Closeable { */ boolean isProcessAlive(); + /** + * Check whether autodetect terminated given maximum 45ms for termination + * + * Processing errors are highly likely caused by autodetect being unexpectedly + * terminated. + * + * Workaround: As we can not easily check if autodetect is alive, we rely on + * the logPipe being ended. As the loghandler runs in another thread which + * might fall behind this one, we give it a grace period of 45ms. + * + * @return false if process has ended for sure, true if it probably still runs + */ + boolean isProcessAliveAfterWaiting(); + /** * Read any content in the error output buffer. * @return An error message or empty String if no error. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 5eba87765f6..9d27c8bb62b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -144,6 +144,16 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { return open; } + @Override + public boolean isProcessAliveAfterWaiting() { + try { + Thread.sleep(45); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return open; + } + @Override public String readError() { return ""; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 9cee3db760a..74cb30ab5e8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -270,6 +270,12 @@ class NativeAutodetectProcess implements AutodetectProcess { return !cppLogHandler.hasLogStreamEnded(); } + @Override + public boolean isProcessAliveAfterWaiting() { + cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45)); + return isProcessAlive(); + } + @Override public String readError() { return cppLogHandler.getErrors(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 2a907a0c652..f037e8c98eb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction; @@ -113,6 +112,9 @@ public class AutoDetectResultProcessor { if (processKilled) { throw e; } + if (process.isProcessAliveAfterWaiting() == false) { + throw e; + } LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); } } @@ -135,6 +137,9 @@ public class AutoDetectResultProcessor { // but we now fully expect jobs to move between nodes without doing // all their graceful close activities. LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); + } else if (process.isProcessAliveAfterWaiting() == false) { + // Don't log the stack trace to not shadow the root cause. + LOGGER.warn("[{}] some results not processed due to the termination of autodetect", jobId); } else { // We should only get here if the iterator throws in which // case parsing the autodetect output has failed. @@ -344,6 +349,5 @@ public class AutoDetectResultProcessor { public ModelSizeStats modelSizeStats() { return latestModelSizeStats; } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java index f6df58f385c..d0948dee88f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.logging; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -90,9 +91,9 @@ public class CppLogMessageHandler implements Closeable { * InputStream throws an exception. */ public void tailStream() throws IOException { + XContent xContent = XContentFactory.xContent(XContentType.JSON); + BytesReference bytesRef = null; try { - XContent xContent = XContentFactory.xContent(XContentType.JSON); - BytesReference bytesRef = null; byte[] readBuf = new byte[readBufSize]; for (int bytesRead = inputStream.read(readBuf); bytesRead != -1; bytesRead = inputStream.read(readBuf)) { if (bytesRef == null) { @@ -110,6 +111,11 @@ public class CppLogMessageHandler implements Closeable { if (lastMessageSummary.count > 0) { logSummarizedMessage(); } + + // if the process crashed, a non-delimited JSON string might still be in the pipe + if (bytesRef != null) { + parseMessage(xContent, bytesRef); + } } } @@ -263,12 +269,20 @@ public class CppLogMessageHandler implements Closeable { } else { LOGGER.log(level, "[{}/{}] [{}@{}] {}", msg.getLogger(), latestPid, msg.getFile(), msg.getLine(), latestMessage); } + } catch (ParsingException e) { + String upstreamMessage = "Fatal error: '" + bytesRef.utf8ToString() + "'"; + if (upstreamMessage.contains("bad_alloc")) { + upstreamMessage += ", process ran out of memory."; + } + storeError(upstreamMessage); + seenFatalError = true; } catch (IOException e) { if (jobId != null) { - LOGGER.warn(new ParameterizedMessage("[{}] Failed to parse C++ log message: {}", + LOGGER.warn(new ParameterizedMessage("[{}] IO failure receiving C++ log message: {}", new Object[] {jobId, bytesRef.utf8ToString()}), e); } else { - LOGGER.warn(new ParameterizedMessage("Failed to parse C++ log message: {}", new Object[] {bytesRef.utf8ToString()}), e); + LOGGER.warn(new ParameterizedMessage("IO failure receiving C++ log message: {}", + new Object[] {bytesRef.utf8ToString()}), e); } } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index c88451b3697..0b010112eba 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -325,6 +325,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false); when(iterator.next()).thenReturn(autodetectResult); AutodetectProcess process = mock(AutodetectProcess.class); + when(process.isProcessAlive()).thenReturn(true); + when(process.isProcessAliveAfterWaiting()).thenReturn(true); when(process.readAutodetectResults()).thenReturn(iterator); doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java index 96d9b66b15a..af2691d6f35 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java @@ -38,6 +38,7 @@ public class CppLogMessageHandlerTests extends ESTestCase { private static final String TEST_MESSAGE_NOISE_DEBUG = "{\"logger\":\"controller\",\"timestamp\":1478261151448,\"level\":\"DEBUG\"," + "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 6\",\"class\":\"ml\"," + "\"method\":\"core::SomeNoiseMake\",\"file\":\"Noisemaker.cc\",\"line\":333}\n"; + private static final String TEST_MESSAGE_NON_JSON_FATAL_ERROR = "Segmentation fault core dumped"; public void testParse() throws IOException, TimeoutException { @@ -96,7 +97,7 @@ public class CppLogMessageHandlerTests extends ESTestCase { mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", CppLogMessageHandler.class.getName(), Level.INFO, "[test_throttling] * message 5")); - executeLoggingTest(is, mockAppender, Level.INFO); + executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling"); } public void testThrottlingSummaryOneRepeat() throws IllegalAccessException, TimeoutException, IOException { @@ -117,7 +118,7 @@ public class CppLogMessageHandlerTests extends ESTestCase { mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO, "[test_throttling] * message 5")); - executeLoggingTest(is, mockAppender, Level.INFO); + executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling"); } public void testThrottlingSummaryLevelChanges() throws IllegalAccessException, TimeoutException, IOException { @@ -143,7 +144,7 @@ public class CppLogMessageHandlerTests extends ESTestCase { mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test6", CppLogMessageHandler.class.getName(), Level.INFO, "[test_throttling] * message 5")); - executeLoggingTest(is, mockAppender, Level.INFO); + executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling"); } public void testThrottlingLastMessageRepeast() throws IllegalAccessException, TimeoutException, IOException { @@ -158,7 +159,7 @@ public class CppLogMessageHandlerTests extends ESTestCase { mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO, "[test_throttling] * message 2 | repeated [5]")); - executeLoggingTest(is, mockAppender, Level.INFO); + executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling"); } public void testThrottlingDebug() throws IllegalAccessException, TimeoutException, IOException { @@ -176,7 +177,7 @@ public class CppLogMessageHandlerTests extends ESTestCase { mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.INFO, "[test_throttling] * message 1 | repeated [5]")); - executeLoggingTest(is, mockAppender, Level.DEBUG); + executeLoggingTest(is, mockAppender, Level.DEBUG, "test_throttling"); } public void testWaitForLogStreamClose() throws IOException { @@ -190,13 +191,26 @@ public class CppLogMessageHandlerTests extends ESTestCase { } } - private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level) throws IOException { + public void testParseFatalError() throws IOException, IllegalAccessException { + InputStream is = new ByteArrayInputStream(TEST_MESSAGE_NON_JSON_FATAL_ERROR.getBytes(StandardCharsets.UTF_8)); + + try (CppLogMessageHandler handler = new CppLogMessageHandler("test_error", is)) { + is.close(); + handler.tailStream(); + assertTrue(handler.seenFatalError()); + assertTrue(handler.getErrors().contains(TEST_MESSAGE_NON_JSON_FATAL_ERROR)); + assertTrue(handler.getErrors().contains("Fatal error")); + } + } + + private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level, String jobId) + throws IOException { Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class); Loggers.addAppender(cppMessageLogger, mockAppender); Level oldLevel = cppMessageLogger.getLevel(); Loggers.setLevel(cppMessageLogger, level); - try (CppLogMessageHandler handler = new CppLogMessageHandler("test_throttling", is)) { + try (CppLogMessageHandler handler = new CppLogMessageHandler(jobId, is)) { handler.tailStream(); } finally { Loggers.removeAppender(cppMessageLogger, mockAppender);