[ML] simple log throttler for autodetect logging (elastic/x-pack-elasticsearch#1323)

Adds a simple log throttler for autodetect logging, summarizes log messages if
they repeat often in a short time period. Throttler gets disabled for debug logging.

relates to: https://github.com/elastic/machine-learning-cpp/issues/111

Original commit: elastic/x-pack-elasticsearch@6729b1fd7c
This commit is contained in:
Hendrik Muhs 2017-05-10 09:25:05 +02:00 committed by GitHub
parent 940ca229aa
commit c1016f3c3d
4 changed files with 248 additions and 10 deletions

View File

@ -15,7 +15,7 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Date;
import java.time.Instant;
import java.util.Objects;
/**
@ -41,7 +41,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable {
static {
PARSER.declareString(CppLogMessage::setLogger, LOGGER_FIELD);
PARSER.declareField(CppLogMessage::setTimestamp, p -> new Date(p.longValue()), TIMESTAMP_FIELD, ValueType.LONG);
PARSER.declareField(CppLogMessage::setTimestamp, p -> Instant.ofEpochMilli(p.longValue()), TIMESTAMP_FIELD, ValueType.LONG);
PARSER.declareString(CppLogMessage::setLevel, LEVEL_FIELD);
PARSER.declareLong(CppLogMessage::setPid, PID_FIELD);
PARSER.declareString(CppLogMessage::setThread, THREAD_FIELD);
@ -58,7 +58,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable {
public static final ParseField TYPE = new ParseField("cpp_log_message");
private String logger = "";
private Date timestamp;
private Instant timestamp;
private String level = "";
private long pid = 0;
private String thread = "";
@ -69,12 +69,12 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable {
private long line = 0;
public CppLogMessage() {
timestamp = new Date();
timestamp = Instant.now();
}
public CppLogMessage(StreamInput in) throws IOException {
logger = in.readString();
timestamp = new Date(in.readVLong());
timestamp = Instant.ofEpochMilli(in.readVLong());
level = in.readString();
pid = in.readVLong();
thread = in.readString();
@ -88,7 +88,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(logger);
out.writeVLong(timestamp.getTime());
out.writeVLong(timestamp.toEpochMilli());
out.writeString(level);
out.writeVLong(pid);
out.writeString(thread);
@ -103,7 +103,7 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(LOGGER_FIELD.getPreferredName(), logger);
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp.getTime());
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp.toEpochMilli());
builder.field(LEVEL_FIELD.getPreferredName(), level);
builder.field(PID_FIELD.getPreferredName(), pid);
builder.field(THREAD_FIELD.getPreferredName(), thread);
@ -124,11 +124,11 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable {
this.logger = logger;
}
public Date getTimestamp() {
public Instant getTimestamp() {
return this.timestamp;
}
public void setTimestamp(Date d) {
public void setTimestamp(Instant d) {
this.timestamp = d;
}
@ -202,6 +202,20 @@ public class CppLogMessage extends ToXContentToBytes implements Writeable {
this.line = line;
}
/**
* Definition of similar message in order to summarize them.
*
* Note: Assuming line and file are already unique, paranoia: check that
* line logging is enabled.
*
* @param other
* message to compare with
* @return true if messages are similar
*/
public boolean isSimilarTo(CppLogMessage other) {
return other != null && line > 0 && line == other.line && file.equals(other.file) && level.equals(other.level);
}
@Override
public int hashCode() {
return Objects.hash(logger, timestamp, level, pid, thread, message, clazz, method, file, line);

View File

@ -24,6 +24,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
@ -40,6 +42,7 @@ public class CppLogMessageHandler implements Closeable {
private static final Logger LOGGER = Loggers.getLogger(CppLogMessageHandler.class);
private static final int DEFAULT_READBUF_SIZE = 1024;
private static final int DEFAULT_ERROR_STORE_SIZE = 5;
private static final long MAX_MESSAGE_INTERVAL_SECONDS = 10;
private final String jobId;
private final InputStream inputStream;
@ -48,6 +51,7 @@ public class CppLogMessageHandler implements Closeable {
private final Deque<String> errorStore;
private final CountDownLatch pidLatch;
private final CountDownLatch cppCopyrightLatch;
private MessageSummary lastMessageSummary = new MessageSummary();
private volatile boolean hasLogStreamEnded;
private volatile boolean seenFatalError;
private volatile long pid;
@ -101,6 +105,11 @@ public class CppLogMessageHandler implements Closeable {
}
} finally {
hasLogStreamEnded = true;
// check if there is some leftover from log summarization
if (lastMessageSummary.count > 0) {
logSummarizedMessage();
}
}
}
@ -213,6 +222,31 @@ public class CppLogMessageHandler implements Closeable {
cppCopyright = latestMessage;
cppCopyrightLatch.countDown();
}
// get out of here quickly if level isn't of interest
if (!LOGGER.isEnabled(level)) {
return;
}
// log message summarization is disabled for debug
if (!LOGGER.isDebugEnabled()) {
// log summarization: log 1st message, count all consecutive messages arriving
// in a certain time window and summarize them as 1 message
if (msg.isSimilarTo(lastMessageSummary.message)
&& (lastMessageSummary.timestamp.until(msg.getTimestamp(), ChronoUnit.SECONDS) < MAX_MESSAGE_INTERVAL_SECONDS)) {
// this is a repeated message, so do not log it, but count
lastMessageSummary.count++;
lastMessageSummary.message = msg;
return;
// not similar, flush last summary if necessary
} else if (lastMessageSummary.count > 0) {
// log last message with summary
logSummarizedMessage();
}
lastMessageSummary.reset(msg.getTimestamp(), msg, level);
}
// TODO: Is there a way to preserve the original timestamp when re-logging?
if (jobId != null) {
LOGGER.log(level, "[{}] [{}/{}] [{}@{}] {}", jobId, msg.getLogger(), latestPid, msg.getFile(), msg.getLine(),
@ -230,6 +264,31 @@ public class CppLogMessageHandler implements Closeable {
}
}
private void logSummarizedMessage() {
// edge case: for 1 repeat, only log the message as is
if (lastMessageSummary.count > 1) {
if (jobId != null) {
LOGGER.log(lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {} | repeated [{}]", jobId,
lastMessageSummary.message.getLogger(), lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(),
lastMessageSummary.message.getLine(), lastMessageSummary.message.getMessage(), lastMessageSummary.count);
} else {
LOGGER.log(lastMessageSummary.level, "[{}/{}] [{}@{}] {} | repeated [{}]", lastMessageSummary.message.getLogger(),
lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(), lastMessageSummary.message.getLine(),
lastMessageSummary.message.getMessage(), lastMessageSummary.count);
}
} else {
if (jobId != null) {
LOGGER.log(lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {}", jobId, lastMessageSummary.message.getLogger(),
lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(), lastMessageSummary.message.getLine(),
lastMessageSummary.message.getMessage());
} else {
LOGGER.log(lastMessageSummary.level, "[{}/{}] [{}@{}] {}", lastMessageSummary.message.getLogger(),
lastMessageSummary.message.getPid(), lastMessageSummary.message.getFile(), lastMessageSummary.message.getLine(),
lastMessageSummary.message.getMessage());
}
}
}
private void storeError(String error) {
if (Strings.isNullOrEmpty(error) || errorStoreSize <= 0) {
return;
@ -248,4 +307,25 @@ public class CppLogMessageHandler implements Closeable {
}
return -1;
}
private static class MessageSummary {
Instant timestamp;
int count;
CppLogMessage message;
Level level;
MessageSummary() {
this.timestamp = Instant.EPOCH;
this.message = null;
this.count = 0;
this.level = Level.OFF;
}
void reset(Instant timestamp, CppLogMessage message, Level level) {
this.timestamp = timestamp;
this.message = message;
this.count = 0;
this.level = level;
}
}
}

View File

@ -5,7 +5,11 @@
*/
package org.elasticsearch.xpack.ml.job.process.logging;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -16,6 +20,25 @@ import java.util.concurrent.TimeoutException;
public class CppLogMessageHandlerTests extends ESTestCase {
private static final String TEST_MESSAGE_NOISE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\","
+ "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 1\",\"class\":\"ml\","
+ "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.cc\",\"line\":333}\n";
private static final String TEST_MESSAGE_NOISE_DIFFERENT_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,"
+ "\"level\":\"INFO\",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 2\",\"class\":\"ml\","
+ "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.cc\",\"line\":333}\n";
private static final String TEST_MESSAGE_NOISE_DIFFERENT_LEVEL = "{\"logger\":\"controller\",\"timestamp\":1478261151445,"
+ "\"level\":\"ERROR\",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 3\",\"class\":\"ml\","
+ "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.cc\",\"line\":333}\n";
private static final String TEST_MESSAGE_OTHER_NOISE = "{\"logger\":\"controller\",\"timestamp\":1478261151446,"
+ "\"level\":\"INFO\",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 4\",\"class\":\"ml\","
+ "\"method\":\"core::SomeNoiseMaker\",\"file\":\"Noisemaker.h\",\"line\":333}\n";
private static final String TEST_MESSAGE_SOMETHING = "{\"logger\":\"controller\",\"timestamp\":1478261151447,\"level\":\"INFO\""
+ ",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 5\",\"class\":\"ml\","
+ "\"method\":\"core::Something\",\"file\":\"Something.cc\",\"line\":555}\n";
private static final String TEST_MESSAGE_NOISE_DEBUG = "{\"logger\":\"controller\",\"timestamp\":1478261151448,\"level\":\"DEBUG\","
+ "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 6\",\"class\":\"ml\","
+ "\"method\":\"core::SomeNoiseMake\",\"file\":\"Noisemaker.cc\",\"line\":333}\n";
public void testParse() throws IOException, TimeoutException {
String testData = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
@ -52,4 +75,125 @@ public class CppLogMessageHandlerTests extends ESTestCase {
}
}
}
public void testThrottlingSummary() throws IllegalAccessException, TimeoutException, IOException {
InputStream is = new ByteArrayInputStream(String.join("",
TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE,
TEST_MESSAGE_NOISE_DEBUG, TEST_MESSAGE_OTHER_NOISE, TEST_MESSAGE_SOMETHING)
.getBytes(StandardCharsets.UTF_8));
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1"));
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1 | repeated [5]"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 4"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 5"));
executeLoggingTest(is, mockAppender, Level.INFO);
}
public void testThrottlingSummaryOneRepeat() throws IllegalAccessException, TimeoutException, IOException {
InputStream is = new ByteArrayInputStream(String
.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DEBUG, TEST_MESSAGE_OTHER_NOISE,
TEST_MESSAGE_SOMETHING)
.getBytes(StandardCharsets.UTF_8));
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1"));
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1 | repeated [1]"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 4"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 5"));
executeLoggingTest(is, mockAppender, Level.INFO);
}
public void testThrottlingSummaryLevelChanges() throws IllegalAccessException, TimeoutException, IOException {
InputStream is = new ByteArrayInputStream(String
.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DIFFERENT_LEVEL,
TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DEBUG,
TEST_MESSAGE_OTHER_NOISE, TEST_MESSAGE_SOMETHING)
.getBytes(StandardCharsets.UTF_8));
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1 | repeated [2]"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.ERROR,
"[test_throttling] * message 3"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1 | repeated [3]"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test5", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 4"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test6", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 5"));
executeLoggingTest(is, mockAppender, Level.INFO);
}
public void testThrottlingLastMessageRepeast() throws IllegalAccessException, TimeoutException, IOException {
InputStream is = new ByteArrayInputStream(String.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE,
TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DIFFERENT_MESSAGE).getBytes(StandardCharsets.UTF_8));
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 2 | repeated [5]"));
executeLoggingTest(is, mockAppender, Level.INFO);
}
public void testThrottlingDebug() throws IllegalAccessException, TimeoutException, IOException {
InputStream is = new ByteArrayInputStream(String.join("", TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE,
TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE, TEST_MESSAGE_NOISE_DEBUG)
.getBytes(StandardCharsets.UTF_8));
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", CppLogMessageHandler.class.getName(), Level.DEBUG,
"[test_throttling] * message 6"));
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test3", CppLogMessageHandler.class.getName(), Level.INFO,
"[test_throttling] * message 1 | repeated [5]"));
executeLoggingTest(is, mockAppender, Level.DEBUG);
}
private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level) throws IOException {
Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class);
Loggers.addAppender(cppMessageLogger, mockAppender);
Level oldLevel = cppMessageLogger.getLevel();
Loggers.setLevel(cppMessageLogger, level);
try (CppLogMessageHandler handler = new CppLogMessageHandler("test_throttling", is)) {
handler.tailStream();
} finally {
Loggers.removeAppender(cppMessageLogger, mockAppender);
Loggers.setLevel(cppMessageLogger, oldLevel);
mockAppender.stop();
}
mockAppender.assertAllExpectationsMatched();
}
}

View File

@ -14,7 +14,7 @@ public class CppLogMessageTests extends AbstractSerializingTestCase<CppLogMessag
public void testDefaultConstructor() {
CppLogMessage msg = new CppLogMessage();
assertEquals("", msg.getLogger());
assertTrue(msg.getTimestamp().toString(), msg.getTimestamp().getTime() > 0);
assertTrue(msg.getTimestamp().toString(), msg.getTimestamp().toEpochMilli() > 0);
assertEquals("", msg.getLevel());
assertEquals(0, msg.getPid());
assertEquals("", msg.getThread());