HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen.

This commit is contained in:
Chen Liang 2018-09-27 10:12:37 -07:00
parent d4adf921a3
commit 91803c8216
6 changed files with 163 additions and 15 deletions

View File

@ -272,6 +272,40 @@ public class LogThrottlingHelper {
} }
} }
/**
* Return the summary information for given index.
*
* @param recorderName The name of the recorder.
* @param idx The index value.
* @return The summary information.
*/
public SummaryStatistics getCurrentStats(String recorderName, int idx) {
LoggingAction currentLog = currentLogs.get(recorderName);
if (currentLog != null) {
return currentLog.getStats(idx);
}
return null;
}
/**
* Helper function to create a message about how many log statements were
* suppressed in the provided log action. If no statements were suppressed,
* this returns an empty string. The message has the format (without quotes):
*
* <p/>' (suppressed logging <i>{suppression_count}</i> times)'
*
* @param action The log action to produce a message about.
* @return A message about suppression within this action.
*/
public static String getLogSupressionMessage(LogAction action) {
if (action.getCount() > 1) {
return " (suppressed logging " + (action.getCount() - 1) + " times)";
} else {
return "";
}
}
/** /**
* A standard log action which keeps track of all of the values which have * A standard log action which keeps track of all of the values which have
* been logged. This is also used for internal bookkeeping via its private * been logged. This is also used for internal bookkeeping via its private

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager {
private int outputBufferCapacity = 512 * 1024; private int outputBufferCapacity = 512 * 1024;
private final URLConnectionFactory connectionFactory; private final URLConnectionFactory connectionFactory;
/** Limit logging about input stream selection to every 5 seconds max. */
private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper selectInputStreamLogHelper =
new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);
@VisibleForTesting @VisibleForTesting
public QuorumJournalManager(Configuration conf, public QuorumJournalManager(Configuration conf,
URI uri, URI uri,
@ -568,8 +575,12 @@ public class QuorumJournalManager implements JournalManager {
"ID " + fromTxnId); "ID " + fromTxnId);
return; return;
} }
LOG.info("Selected loggers with >= " + maxAllowedTxns + LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
" transactions starting from " + fromTxnId); if (logAction.shouldLog()) {
LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
"starting from lowest txn ID " + logAction.getStats(0).getMin() +
LogThrottlingHelper.getLogSupressionMessage(logAction));
}
PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>( PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (GetJournaledEditsResponseProto resp : responseMap.values()) { for (GetJournaledEditsResponseProto resp : responseMap.values()) {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade; import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
@ -113,27 +112,45 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.util.Holder; import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.util.ChunkedArrayList; import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class FSEditLogLoader { public class FSEditLogLoader {
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName()); static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
/** Limit logging about edit loading to every 5 seconds max. */
@VisibleForTesting
static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper loadEditsLogHelper =
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
private final FSNamesystem fsNamesys; private final FSNamesystem fsNamesys;
private final BlockManager blockManager; private final BlockManager blockManager;
private final Timer timer;
private long lastAppliedTxId; private long lastAppliedTxId;
/** Total number of end transactions loaded. */ /** Total number of end transactions loaded. */
private int totalEdits = 0; private int totalEdits = 0;
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) { public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
this(fsNamesys, lastAppliedTxId, new Timer());
}
@VisibleForTesting
FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) {
this.fsNamesys = fsNamesys; this.fsNamesys = fsNamesys;
this.blockManager = fsNamesys.getBlockManager(); this.blockManager = fsNamesys.getBlockManager();
this.lastAppliedTxId = lastAppliedTxId; this.lastAppliedTxId = lastAppliedTxId;
this.timer = timer;
} }
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
@ -154,14 +171,26 @@ public class FSEditLogLoader {
prog.beginStep(Phase.LOADING_EDITS, step); prog.beginStep(Phase.LOADING_EDITS, step);
fsNamesys.writeLock(); fsNamesys.writeLock();
try { try {
long startTime = monotonicNow(); long startTime = timer.monotonicNow();
LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
if (preLogAction.shouldLog()) {
FSImage.LOG.info("Start loading edits file " + edits.getName() FSImage.LOG.info("Start loading edits file " + edits.getName()
+ " maxTxnsToRead = " + maxTxnsToRead); + " maxTxnsToRead = " + maxTxnsToRead +
LogThrottlingHelper.getLogSupressionMessage(preLogAction));
}
long numEdits = loadEditRecords(edits, false, expectedStartingTxId, long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
maxTxnsToRead, startOpt, recovery); maxTxnsToRead, startOpt, recovery);
FSImage.LOG.info("Edits file " + edits.getName() long endTime = timer.monotonicNow();
+ " of size " + edits.length() + " edits # " + numEdits LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); numEdits, edits.length(), endTime - startTime);
if (postLogAction.shouldLog()) {
FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +
"total size {}, total edits {}, total load time {} ms",
postLogAction.getCount(), edits.getName(),
postLogAction.getStats(1).getSum(),
postLogAction.getStats(0).getSum(),
postLogAction.getStats(2).getSum());
}
return numEdits; return numEdits;
} finally { } finally {
edits.close(); edits.close();
@ -202,7 +231,7 @@ public class FSEditLogLoader {
Step step = createStartupProgressStep(in); Step step = createStartupProgressStep(in);
prog.setTotal(Phase.LOADING_EDITS, step, numTxns); prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
long lastLogTime = monotonicNow(); long lastLogTime = timer.monotonicNow();
long lastInodeId = fsNamesys.dir.getLastInodeId(); long lastInodeId = fsNamesys.dir.getLastInodeId();
try { try {
@ -282,7 +311,7 @@ public class FSEditLogLoader {
} }
// log progress // log progress
if (op.hasTransactionId()) { if (op.hasTransactionId()) {
long now = monotonicNow(); long now = timer.monotonicNow();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1; long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
int percent = Math.round((float) deltaTxId / numTxns * 100); int percent = Math.round((float) deltaTxId / numTxns * 100);

View File

@ -69,6 +69,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -123,6 +125,11 @@ public class FSImage implements Closeable {
private final Set<Long> currentlyCheckpointing = private final Set<Long> currentlyCheckpointing =
Collections.<Long>synchronizedSet(new HashSet<Long>()); Collections.<Long>synchronizedSet(new HashSet<Long>());
/** Limit logging about edit loading to every 5 seconds max. */
private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper loadEditLogHelper =
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
/** /**
* Construct an FSImage * Construct an FSImage
* @param conf Configuration * @param conf Configuration
@ -884,8 +891,16 @@ public class FSImage implements Closeable {
// Load latest edits // Load latest edits
for (EditLogInputStream editIn : editStreams) { for (EditLogInputStream editIn : editStreams) {
LogAction logAction = loadEditLogHelper.record();
if (logAction.shouldLog()) {
String logSuppressed = "";
if (logAction.getCount() > 1) {
logSuppressed = "; suppressed logging for " +
(logAction.getCount() - 1) + " edit reads";
}
LOG.info("Reading " + editIn + " expecting start txid #" + LOG.info("Reading " + editIn + " expecting start txid #" +
(lastAppliedTxId + 1)); (lastAppliedTxId + 1) + logSuppressed);
}
try { try {
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead, loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
startOpt, recovery); startOpt, recovery);

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
/** /**
* A merged input stream that handles failover between different edit logs. * A merged input stream that handles failover between different edit logs.
@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
private long prevTxId; private long prevTxId;
private final EditLogInputStream[] streams; private final EditLogInputStream[] streams;
/** Limit logging about fast forwarding the stream to every 5 seconds max. */
private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
private final LogThrottlingHelper fastForwardLoggingHelper =
new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
/** /**
* States that the RedundantEditLogInputStream can be in. * States that the RedundantEditLogInputStream can be in.
* *
@ -174,8 +181,12 @@ class RedundantEditLogInputStream extends EditLogInputStream {
case SKIP_UNTIL: case SKIP_UNTIL:
try { try {
if (prevTxId != HdfsServerConstants.INVALID_TXID) { if (prevTxId != HdfsServerConstants.INVALID_TXID) {
LogAction logAction = fastForwardLoggingHelper.record();
if (logAction.shouldLog()) {
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() + LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
"' to transaction ID " + (prevTxId + 1)); "' to transaction ID " + (prevTxId + 1) +
LogThrottlingHelper.getLogSupressionMessage(logAction));
}
streams[curIdx].skipUntil(prevTxId + 1); streams[curIdx].skipUntil(prevTxId + 1);
} }
} catch (IOException e) { } catch (IOException e) {

View File

@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.File; import java.io.File;
@ -61,8 +64,10 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -101,6 +106,7 @@ public class TestFSEditLogLoader {
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class); private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
private static final int NUM_DATA_NODES = 0; private static final int NUM_DATA_NODES = 0;
private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
private final ErasureCodingPolicy testECPolicy private final ErasureCodingPolicy testECPolicy
= StripedFileTestUtil.getDefaultECPolicy(); = StripedFileTestUtil.getDefaultECPolicy();
@ -799,4 +805,46 @@ public class TestFSEditLogLoader {
} }
} }
} }
@Test
public void setLoadFSEditLogThrottling() throws Exception {
FSNamesystem namesystem = mock(FSNamesystem.class);
namesystem.dir = mock(FSDirectory.class);
FakeTimer timer = new FakeTimer();
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
assertTrue(capture.getOutput().contains("Start loading edits file " +
FAKE_EDIT_STREAM_NAME));
assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)"));
assertFalse(capture.getOutput().contains("suppressed"));
timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2);
capture.clearOutput();
loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11);
assertFalse(capture.getOutput().contains("Start loading edits file"));
assertFalse(capture.getOutput().contains("edits file(s)"));
timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS);
capture.clearOutput();
loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21);
assertTrue(capture.getOutput().contains("Start loading edits file " +
FAKE_EDIT_STREAM_NAME));
assertTrue(capture.getOutput().contains("suppressed logging 1 times"));
assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)"));
assertTrue(capture.getOutput().contains("total size 2.0"));
}
private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx)
throws IOException {
EditLogInputStream fakeStream = mock(EditLogInputStream.class);
when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME);
when(fakeStream.getFirstTxId()).thenReturn(startTx);
when(fakeStream.getLastTxId()).thenReturn(endTx);
when(fakeStream.length()).thenReturn(1L);
return fakeStream;
}
} }