diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java index af99df06ea3..aaf5373536b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java @@ -15,7 +15,7 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Date; +import java.time.Instant; import java.util.Objects; /** @@ -41,7 +41,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable { static { PARSER.declareString(CppLogMessage::setLogger, LOGGER_FIELD); - PARSER.declareField(CppLogMessage::setTimestamp, p -> new Date(p.longValue()), TIMESTAMP_FIELD, ValueType.LONG); + PARSER.declareField(CppLogMessage::setTimestamp, p -> Instant.ofEpochMilli(p.longValue()), TIMESTAMP_FIELD, ValueType.LONG); PARSER.declareString(CppLogMessage::setLevel, LEVEL_FIELD); PARSER.declareLong(CppLogMessage::setPid, PID_FIELD); PARSER.declareString(CppLogMessage::setThread, THREAD_FIELD); @@ -58,7 +58,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable { public static final ParseField TYPE = new ParseField("cpp_log_message"); private String logger = ""; - private Date timestamp; + private Instant timestamp; private String level = ""; private long pid = 0; private String thread = ""; @@ -69,12 +69,12 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable { private long line = 0; public CppLogMessage() { - timestamp = new Date(); + timestamp = Instant.now(); } public CppLogMessage(StreamInput in) throws IOException { logger = in.readString(); - timestamp = new Date(in.readVLong()); + timestamp = Instant.ofEpochMilli(in.readVLong()); level = in.readString(); pid = in.readVLong(); thread = in.readString(); @@ -88,7 +88,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(logger); - out.writeVLong(timestamp.getTime()); + out.writeVLong(timestamp.toEpochMilli()); out.writeString(level); out.writeVLong(pid); out.writeString(thread); @@ -103,7 +103,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(LOGGER_FIELD.getPreferredName(), logger); - builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp.getTime()); + builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp.toEpochMilli()); builder.field(LEVEL_FIELD.getPreferredName(), level); builder.field(PID_FIELD.getPreferredName(), pid); builder.field(THREAD_FIELD.getPreferredName(), thread); @@ -124,11 +124,11 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable { this.logger = logger; } - public Date getTimestamp() { + public Instant getTimestamp() { return this.timestamp; } - public void setTimestamp(Date d) { + public void setTimestamp(Instant d) { this.timestamp = d; } @@ -202,6 +202,20 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable { this.line = line; } + /** + * Definition of similar message in order to summarize them. + * + * Note: Assuming line and file are already unique, paranoia: check that + * line logging is enabled. + * + * @param other + * message to compare with + * @return true if messages are similar + */ + public boolean isSimilarTo(CppLogMessage other) { + return other != null && line > 0 && line == other.line && file.equals(other.file) && level.equals(other.level); + } + @Override public int hashCode() { return Objects.hash(logger, timestamp, level, pid, thread, message, clazz, method, file, line); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java index d35f1ec18ed..c43bb1afbfa 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java @@ -24,6 +24,8 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Deque; import java.util.Objects; import java.util.concurrent.CountDownLatch; @@ -40,6 +42,7 @@ public class CppLogMessageHandler implements Closeable { private static final Logger LOGGER = Loggers.getLogger(CppLogMessageHandler.class); private static final int DEFAULT_READBUF_SIZE = 1024; private static final int DEFAULT_ERROR_STORE_SIZE = 5; + private static final long MAX_MESSAGE_INTERVAL_SECONDS = 10; private final String jobId; private final InputStream inputStream; @@ -48,6 +51,7 @@ public class CppLogMessageHandler implements Closeable { private final Deque errorStore; private final CountDownLatch pidLatch; private final CountDownLatch cppCopyrightLatch; + private MessageSummary lastMessageSummary = new MessageSummary(); private volatile boolean hasLogStreamEnded; private volatile boolean seenFatalError; private volatile long pid; @@ -101,6 +105,11 @@ public class CppLogMessageHandler implements Closeable { } } finally { hasLogStreamEnded = true; + + // check if there is some leftover from log summarization + if (lastMessageSummary.count > 0) { + logSummarizedMessage(); + } } } @@ -213,6 +222,31 @@ public class CppLogMessageHandler implements Closeable { cppCopyright = latestMessage; cppCopyrightLatch.countDown(); } + + // get out of here quickly if level isn't of interest + if (!LOGGER.isEnabled(level)) { + return; + } + + // log message summarization is disabled for debug + if (!LOGGER.isDebugEnabled()) { + // log summarization: log 1st message, count all consecutive messages arriving + // in a certain time window and summarize them as 1 message + if (msg.isSimilarTo(lastMessageSummary.message) + && (lastMessageSummary.timestamp.until(msg.getTimestamp(), ChronoUnit.SECONDS) < MAX_MESSAGE_INTERVAL_SECONDS)) { + + // this is a repeated message, so do not log it, but count + lastMessageSummary.count++; + lastMessageSummary.message = msg; + return; + // not similar, flush last summary if necessary + } else if (lastMessageSummary.count > 0) { + // log last message with summary + logSummarizedMessage(); + } + + lastMessageSummary.reset(msg.getTimestamp(), msg, level); + } // TODO: Is there a way to preserve the original timestamp when re-logging? if (jobId != null) { LOGGER.log(level, "[{}] [{}/{}] [{}@{}] {}", jobId, msg.getLogger(), latestPid, msg.getFile(), msg.getLine(), @@ -230,6 +264,31 @@ public class CppLogMessageHandler implements Closeable { } } + private void logSummarizedMessage() { + // edge case: for 1 repeat, only log the message as is + if (lastMessageSummary.count > 1) { + if (jobId != null) { + LOGGER.log(lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {} | repeated [{}]", jobId, + lastMessageSummary.message.getLogger(), lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(), + lastMessageSummary.message.getLine(), lastMessageSummary.message.getMessage(), lastMessageSummary.count); + } else { + LOGGER.log(lastMessageSummary.level, "[{}/{}] [{}@{}] {} | repeated [{}]", lastMessageSummary.message.getLogger(), + lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(), lastMessageSummary.message.getLine(), + lastMessageSummary.message.getMessage(), lastMessageSummary.count); + } + } else { + if (jobId != null) { + LOGGER.log(lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {}", jobId, lastMessageSummary.message.getLogger(), + lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(), lastMessageSummary.message.getLine(), + lastMessageSummary.message.getMessage()); + } else { + LOGGER.log(lastMessageSummary.level, "[{}/{}] [{}@{}] {}", lastMessageSummary.message.getLogger(), + lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(), lastMessageSummary.message.getLine(), + lastMessageSummary.message.getMessage()); + } + } + } + private void storeError(String error) { if (Strings.isNullOrEmpty(error) || errorStoreSize <= 0) { return; @@ -248,4 +307,25 @@ public class CppLogMessageHandler implements Closeable { } return -1; } + + private static class MessageSummary { + Instant timestamp; + int count; + CppLogMessage message; + Level level; + + MessageSummary() { + this.timestamp = Instant.EPOCH; + this.message = null; + this.count = 0; + this.level = Level.OFF; + } + + void reset(Instant timestamp, CppLogMessage message, Level level) { + this.timestamp = timestamp; + this.message = message; + this.count = 0; + this.level = level; + } + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java index 3e7ce2a5d41..3cfcd5e32a1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java @@ -5,7 +5,11 @@ */ package org.elasticsearch.xpack.ml.job.process.logging; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -16,6 +20,25 @@ import java.util.concurrent.TimeoutException; public class CppLogMessageHandlerTests extends ESTestCase { + private static final String TEST_MESSAGE_NOISE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\"," + + "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 1\",\"class\":\"ml\"," + + "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.cc\",\"line\":333}\n"; + private static final String TEST_MESSAGE_NOISE_DIFFERENT_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445," + + "\"level\":\"INFO\",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 2\",\"class\":\"ml\"," + + "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.cc\",\"line\":333}\n"; + private static final String TEST_MESSAGE_NOISE_DIFFERENT_LEVEL = "{\"logger\":\"controller\",\"timestamp\":1478261151445," + + "\"level\":\"ERROR\",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 3\",\"class\":\"ml\"," + + "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.cc\",\"line\":333}\n"; + private static final String TEST_MESSAGE_OTHER_NOISE = "{\"logger\":\"controller\",\"timestamp\":1478261151446," + + "\"level\":\"INFO\",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 4\",\"class\":\"ml\"," + + "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.h\",\"line\":333}\n"; + private static final String TEST_MESSAGE_SOMETHING = "{\"logger\":\"controller\",\"timestamp\":1478261151447,\"level\":\"INFO\"" + + ",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 5\",\"class\":\"ml\"," + + "\"method\":\"core::Something\",\"file\":\"Something.cc\",\"line\":555}\n"; + private static final String TEST_MESSAGE_NOISE_DEBUG = "{\"logger\":\"controller\",\"timestamp\":1478261151448,\"level\":\"DEBUG\"," + + "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 6\",\"class\":\"ml\"," + + "\"method\":\"core::SomeNoiseMake\",\"file\":\"Noisemaker.cc\",\"line\":333}\n"; + public void testParse() throws IOException, TimeoutException { String testData = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," @@ -52,4 +75,125 @@ public class CppLogMessageHandlerTests extends ESTestCase { } } } + + public void testThrottlingSummary() throws IllegalAccessException, TimeoutException, IOException { + + InputStream is = new ByteArrayInputStream(String.join("", + TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, + TEST_MESSAGE_NOISE_DEBUG, TEST_MESSAGE_OTHER_NOISE, TEST_MESSAGE_SOMETHING) + .getBytes(StandardCharsets.UTF_8)); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1")); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1 | repeated [5]")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 4")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 5")); + + executeLoggingTest(is, mockAppender, Level.INFO); + } + + public void testThrottlingSummaryOneRepeat() throws IllegalAccessException, TimeoutException, IOException { + + InputStream is = new ByteArrayInputStream(String + .join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DEBUG, TEST_MESSAGE_OTHER_NOISE, + TEST_MESSAGE_SOMETHING) + .getBytes(StandardCharsets.UTF_8)); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1")); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1 | repeated [1]")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 4")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 5")); + + executeLoggingTest(is, mockAppender, Level.INFO); + } + + public void testThrottlingSummaryLevelChanges() throws IllegalAccessException, TimeoutException, IOException { + + InputStream is = new ByteArrayInputStream(String + .join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DIFFERENT_LEVEL, + TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DEBUG, + TEST_MESSAGE_OTHER_NOISE, TEST_MESSAGE_SOMETHING) + .getBytes(StandardCharsets.UTF_8)); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1 | repeated [2]")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.ERROR, + "[test_throttling] * message 3")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1 | repeated [3]")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test5", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 4")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test6", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 5")); + + executeLoggingTest(is, mockAppender, Level.INFO); + } + + public void testThrottlingLastMessageRepeast() throws IllegalAccessException, TimeoutException, IOException { + + InputStream is = new ByteArrayInputStream(String.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, + TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DIFFERENT_MESSAGE).getBytes(StandardCharsets.UTF_8)); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 2 | repeated [5]")); + + executeLoggingTest(is, mockAppender, Level.INFO); + } + + public void testThrottlingDebug() throws IllegalAccessException, TimeoutException, IOException { + + InputStream is = new ByteArrayInputStream(String.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, + TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DEBUG) + .getBytes(StandardCharsets.UTF_8)); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.DEBUG, + "[test_throttling] * message 6")); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.INFO, + "[test_throttling] * message 1 | repeated [5]")); + + executeLoggingTest(is, mockAppender, Level.DEBUG); + } + + private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level) throws IOException { + Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class); + Loggers.addAppender(cppMessageLogger, mockAppender); + + Level oldLevel = cppMessageLogger.getLevel(); + Loggers.setLevel(cppMessageLogger, level); + try (CppLogMessageHandler handler = new CppLogMessageHandler("test_throttling", is)) { + handler.tailStream(); + } finally { + Loggers.removeAppender(cppMessageLogger, mockAppender); + Loggers.setLevel(cppMessageLogger, oldLevel); + mockAppender.stop(); + } + + mockAppender.assertAllExpectationsMatched(); + } } + diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java index ab43630697e..a2ec8a76fad 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java @@ -14,7 +14,7 @@ public class CppLogMessageTests extends AbstractSerializingTestCase 0); + assertTrue(msg.getTimestamp().toString(), msg.getTimestamp().toEpochMilli() > 0); assertEquals("", msg.getLevel()); assertEquals(0, msg.getPid()); assertEquals("", msg.getThread());