[ML] improve logging for autodetect crashes (elastic/x-pack-elasticsearch#2866)

Improving logging for unexpected autodetect termination (crash, oom). Output to the log pipe not conforming to the json log output format are treated as fatal error and logged, so that the crash as well as a proper error message if available gets logged.

Original commit: elastic/x-pack-elasticsearch@ae5d792d3f
This commit is contained in:
Hendrik Muhs 2017-11-09 15:47:23 +01:00 committed by GitHub
parent 142b59a4d5
commit 2693c6a730
7 changed files with 77 additions and 13 deletions

View File

@ -125,6 +125,20 @@ public interface AutodetectProcess extends Closeable {
*/
boolean isProcessAlive();
/**
* Check whether autodetect terminated given maximum 45ms for termination
*
* Processing errors are highly likely caused by autodetect being unexpectedly
* terminated.
*
* Workaround: As we can not easily check if autodetect is alive, we rely on
* the logPipe being ended. As the loghandler runs in another thread which
* might fall behind this one, we give it a grace period of 45ms.
*
* @return false if process has ended for sure, true if it probably still runs
*/
boolean isProcessAliveAfterWaiting();
/**
* Read any content in the error output buffer.
* @return An error message or empty String if no error.

View File

@ -144,6 +144,16 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
return open;
}
@Override
public boolean isProcessAliveAfterWaiting() {
try {
Thread.sleep(45);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return open;
}
@Override
public String readError() {
return "";

View File

@ -270,6 +270,12 @@ class NativeAutodetectProcess implements AutodetectProcess {
return !cppLogHandler.hasLogStreamEnded();
}
@Override
public boolean isProcessAliveAfterWaiting() {
cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45));
return isProcessAlive();
}
@Override
public String readError() {
return cppLogHandler.getErrors();

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
@ -113,6 +112,9 @@ public class AutoDetectResultProcessor {
if (processKilled) {
throw e;
}
if (process.isProcessAliveAfterWaiting() == false) {
throw e;
}
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e);
}
}
@ -135,6 +137,9 @@ public class AutoDetectResultProcessor {
// but we now fully expect jobs to move between nodes without doing
// all their graceful close activities.
LOGGER.warn("[{}] some results not processed due to the process being killed", jobId);
} else if (process.isProcessAliveAfterWaiting() == false) {
// Don't log the stack trace to not shadow the root cause.
LOGGER.warn("[{}] some results not processed due to the termination of autodetect", jobId);
} else {
// We should only get here if the iterator throws in which
// case parsing the autodetect output has failed.
@ -344,6 +349,5 @@ public class AutoDetectResultProcessor {
public ModelSizeStats modelSizeStats() {
return latestModelSizeStats;
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.logging;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -90,9 +91,9 @@ public class CppLogMessageHandler implements Closeable {
* InputStream throws an exception.
*/
public void tailStream() throws IOException {
XContent xContent = XContentFactory.xContent(XContentType.JSON);
BytesReference bytesRef = null;
try {
XContent xContent = XContentFactory.xContent(XContentType.JSON);
BytesReference bytesRef = null;
byte[] readBuf = new byte[readBufSize];
for (int bytesRead = inputStream.read(readBuf); bytesRead != -1; bytesRead = inputStream.read(readBuf)) {
if (bytesRef == null) {
@ -110,6 +111,11 @@ public class CppLogMessageHandler implements Closeable {
if (lastMessageSummary.count > 0) {
logSummarizedMessage();
}
// if the process crashed, a non-delimited JSON string might still be in the pipe
if (bytesRef != null) {
parseMessage(xContent, bytesRef);
}
}
}
@ -263,12 +269,20 @@ public class CppLogMessageHandler implements Closeable {
} else {
LOGGER.log(level, "[{}/{}] [{}@{}] {}", msg.getLogger(), latestPid, msg.getFile(), msg.getLine(), latestMessage);
}
} catch (ParsingException e) {
String upstreamMessage = "Fatal error: '" + bytesRef.utf8ToString() + "'";
if (upstreamMessage.contains("bad_alloc")) {
upstreamMessage += ", process ran out of memory.";
}
storeError(upstreamMessage);
seenFatalError = true;
} catch (IOException e) {
if (jobId != null) {
LOGGER.warn(new ParameterizedMessage("[{}] Failed to parse C++ log message: {}",
LOGGER.warn(new ParameterizedMessage("[{}] IO failure receiving C++ log message: {}",
new Object[] {jobId, bytesRef.utf8ToString()}), e);
} else {
LOGGER.warn(new ParameterizedMessage("Failed to parse C++ log message: {}", new Object[] {bytesRef.utf8ToString()}), e);
LOGGER.warn(new ParameterizedMessage("IO failure receiving C++ log message: {}",
new Object[] {bytesRef.utf8ToString()}), e);
}
}
}

View File

@ -325,6 +325,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.isProcessAlive()).thenReturn(true);
when(process.isProcessAliveAfterWaiting()).thenReturn(true);
when(process.readAutodetectResults()).thenReturn(iterator);
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any());

View File

@ -38,6 +38,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
private static final String TEST_MESSAGE_NOISE_DEBUG = "{\"logger\":\"controller\",\"timestamp\":1478261151448,\"level\":\"DEBUG\","
+ "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 6\",\"class\":\"ml\","
+ "\"method\":\"core::SomeNoiseMake\",\"file\":\"Noisemaker.cc\",\"line\":333}\n";
private static final String TEST_MESSAGE_NON_JSON_FATAL_ERROR = "Segmentation fault core dumped";
public void testParse() throws IOException, TimeoutException {
@ -96,7 +97,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 5"));
executeLoggingTest(is, mockAppender, Level.INFO);
executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling");
}
public void testThrottlingSummaryOneRepeat() throws IllegalAccessException, TimeoutException, IOException {
@ -117,7 +118,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 5"));
executeLoggingTest(is, mockAppender, Level.INFO);
executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling");
}
public void testThrottlingSummaryLevelChanges() throws IllegalAccessException, TimeoutException, IOException {
@ -143,7 +144,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test6", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 5"));
executeLoggingTest(is, mockAppender, Level.INFO);
executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling");
}
public void testThrottlingLastMessageRepeast() throws IllegalAccessException, TimeoutException, IOException {
@ -158,7 +159,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 2 | repeated [5]"));
executeLoggingTest(is, mockAppender, Level.INFO);
executeLoggingTest(is, mockAppender, Level.INFO, "test_throttling");
}
public void testThrottlingDebug() throws IllegalAccessException, TimeoutException, IOException {
@ -176,7 +177,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1 | repeated [5]"));
executeLoggingTest(is, mockAppender, Level.DEBUG);
executeLoggingTest(is, mockAppender, Level.DEBUG, "test_throttling");
}
public void testWaitForLogStreamClose() throws IOException {
@ -190,13 +191,26 @@ public class CppLogMessageHandlerTests extends ESTestCase {
}
}
private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level) throws IOException {
public void testParseFatalError() throws IOException, IllegalAccessException {
InputStream is = new ByteArrayInputStream(TEST_MESSAGE_NON_JSON_FATAL_ERROR.getBytes(StandardCharsets.UTF_8));
try (CppLogMessageHandler handler = new CppLogMessageHandler("test_error", is)) {
is.close();
handler.tailStream();
assertTrue(handler.seenFatalError());
assertTrue(handler.getErrors().contains(TEST_MESSAGE_NON_JSON_FATAL_ERROR));
assertTrue(handler.getErrors().contains("Fatal error"));
}
}
private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level, String jobId)
throws IOException {
Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class);
Loggers.addAppender(cppMessageLogger, mockAppender);
Level oldLevel = cppMessageLogger.getLevel();
Loggers.setLevel(cppMessageLogger, level);
try (CppLogMessageHandler handler = new CppLogMessageHandler("test_throttling", is)) {
try (CppLogMessageHandler handler = new CppLogMessageHandler(jobId, is)) {
handler.tailStream();
} finally {
Loggers.removeAppender(cppMessageLogger, mockAppender);