From 91803c82167407402832b1473e2bc42646818664 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Thu, 27 Sep 2018 10:12:37 -0700 Subject: [PATCH] HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen. --- .../hadoop/log/LogThrottlingHelper.java | 34 +++++++++++++ .../qjournal/client/QuorumJournalManager.java | 15 +++++- .../hdfs/server/namenode/FSEditLogLoader.java | 47 ++++++++++++++---- .../hadoop/hdfs/server/namenode/FSImage.java | 19 +++++++- .../namenode/RedundantEditLogInputStream.java | 15 +++++- .../server/namenode/TestFSEditLogLoader.java | 48 +++++++++++++++++++ 6 files changed, 163 insertions(+), 15 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java index aa4e61c570b..591c3fb8a0b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java @@ -272,6 +272,40 @@ public class LogThrottlingHelper { } } + /** + * Return the summary information for given index. + * + * @param recorderName The name of the recorder. + * @param idx The index value. + * @return The summary information. + */ + public SummaryStatistics getCurrentStats(String recorderName, int idx) { + LoggingAction currentLog = currentLogs.get(recorderName); + if (currentLog != null) { + return currentLog.getStats(idx); + } + + return null; + } + + /** + * Helper function to create a message about how many log statements were + * suppressed in the provided log action. If no statements were suppressed, + * this returns an empty string. The message has the format (without quotes): + * + *

' (suppressed logging {suppression_count} times)' + * + * @param action The log action to produce a message about. + * @return A message about suppression within this action. + */ + public static String getLogSupressionMessage(LogAction action) { + if (action.getCount() > 1) { + return " (suppressed logging " + (action.getCount() - 1) + " times)"; + } else { + return ""; + } + } + /** * A standard log action which keeps track of all of the values which have * been logged. This is also used for internal bookkeeping via its private diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index f4177e378ff..5f6d45f8117 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.log.LogThrottlingHelper; +import org.apache.hadoop.log.LogThrottlingHelper.LogAction; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager { private int outputBufferCapacity = 512 * 1024; private final URLConnectionFactory connectionFactory; + /** Limit logging about input stream selection to every 5 seconds max. */ + private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000; + private final LogThrottlingHelper selectInputStreamLogHelper = + new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS); + @VisibleForTesting public QuorumJournalManager(Configuration conf, URI uri, @@ -568,8 +575,12 @@ public class QuorumJournalManager implements JournalManager { "ID " + fromTxnId); return; } - LOG.info("Selected loggers with >= " + maxAllowedTxns + - " transactions starting from " + fromTxnId); + LogAction logAction = selectInputStreamLogHelper.record(fromTxnId); + if (logAction.shouldLog()) { + LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " + + "starting from lowest txn ID " + logAction.getStats(0).getMin() + + LogThrottlingHelper.getLogSupressionMessage(logAction)); + } PriorityQueue allStreams = new PriorityQueue<>( JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (GetJournaledEditsResponseProto resp : responseMap.values()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 82e35bd353e..0d86b76652b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade; -import static org.apache.hadoop.util.Time.monotonicNow; import java.io.FilterInputStream; import java.io.IOException; @@ -113,27 +112,45 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.util.Holder; +import org.apache.hadoop.log.LogThrottlingHelper; import org.apache.hadoop.util.ChunkedArrayList; +import org.apache.hadoop.util.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import static org.apache.hadoop.log.LogThrottlingHelper.LogAction; + @InterfaceAudience.Private @InterfaceStability.Evolving public class FSEditLogLoader { static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName()); static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec + /** Limit logging about edit loading to every 5 seconds max. */ + @VisibleForTesting + static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000; + private final LogThrottlingHelper loadEditsLogHelper = + new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS); + private final FSNamesystem fsNamesys; private final BlockManager blockManager; + private final Timer timer; private long lastAppliedTxId; /** Total number of end transactions loaded. */ private int totalEdits = 0; public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) { + this(fsNamesys, lastAppliedTxId, new Timer()); + } + + @VisibleForTesting + FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) { this.fsNamesys = fsNamesys; this.blockManager = fsNamesys.getBlockManager(); this.lastAppliedTxId = lastAppliedTxId; + this.timer = timer; } long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) @@ -154,14 +171,26 @@ public class FSEditLogLoader { prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { - long startTime = monotonicNow(); - FSImage.LOG.info("Start loading edits file " + edits.getName() - + " maxTxnsToRead = " + maxTxnsToRead); + long startTime = timer.monotonicNow(); + LogAction preLogAction = loadEditsLogHelper.record("pre", startTime); + if (preLogAction.shouldLog()) { + FSImage.LOG.info("Start loading edits file " + edits.getName() + + " maxTxnsToRead = " + maxTxnsToRead + + LogThrottlingHelper.getLogSupressionMessage(preLogAction)); + } long numEdits = loadEditRecords(edits, false, expectedStartingTxId, maxTxnsToRead, startOpt, recovery); - FSImage.LOG.info("Edits file " + edits.getName() - + " of size " + edits.length() + " edits # " + numEdits - + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); + long endTime = timer.monotonicNow(); + LogAction postLogAction = loadEditsLogHelper.record("post", endTime, + numEdits, edits.length(), endTime - startTime); + if (postLogAction.shouldLog()) { + FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " + + "total size {}, total edits {}, total load time {} ms", + postLogAction.getCount(), edits.getName(), + postLogAction.getStats(1).getSum(), + postLogAction.getStats(0).getSum(), + postLogAction.getStats(2).getSum()); + } return numEdits; } finally { edits.close(); @@ -202,7 +231,7 @@ public class FSEditLogLoader { Step step = createStartupProgressStep(in); prog.setTotal(Phase.LOADING_EDITS, step, numTxns); Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); - long lastLogTime = monotonicNow(); + long lastLogTime = timer.monotonicNow(); long lastInodeId = fsNamesys.dir.getLastInodeId(); try { @@ -282,7 +311,7 @@ public class FSEditLogLoader { } // log progress if (op.hasTransactionId()) { - long now = monotonicNow(); + long now = timer.monotonicNow(); if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1; int percent = Math.round((float) deltaTxId / numTxns * 100); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 5cfc0176f1d..041a5ccb8de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -69,6 +69,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.log.LogThrottlingHelper; +import org.apache.hadoop.log.LogThrottlingHelper.LogAction; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Time; @@ -123,6 +125,11 @@ public class FSImage implements Closeable { private final Set currentlyCheckpointing = Collections.synchronizedSet(new HashSet()); + /** Limit logging about edit loading to every 5 seconds max. */ + private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000; + private final LogThrottlingHelper loadEditLogHelper = + new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS); + /** * Construct an FSImage * @param conf Configuration @@ -884,8 +891,16 @@ public class FSImage implements Closeable { // Load latest edits for (EditLogInputStream editIn : editStreams) { - LOG.info("Reading " + editIn + " expecting start txid #" + - (lastAppliedTxId + 1)); + LogAction logAction = loadEditLogHelper.record(); + if (logAction.shouldLog()) { + String logSuppressed = ""; + if (logAction.getCount() > 1) { + logSuppressed = "; suppressed logging for " + + (logAction.getCount() - 1) + " edit reads"; + } + LOG.info("Reading " + editIn + " expecting start txid #" + + (lastAppliedTxId + 1) + logSuppressed); + } try { loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead, startOpt, recovery); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java index a73206b31ef..74bf39a73ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java @@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; +import org.apache.hadoop.log.LogThrottlingHelper; +import org.apache.hadoop.log.LogThrottlingHelper.LogAction; /** * A merged input stream that handles failover between different edit logs. @@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream { private long prevTxId; private final EditLogInputStream[] streams; + /** Limit logging about fast forwarding the stream to every 5 seconds max. */ + private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000; + private final LogThrottlingHelper fastForwardLoggingHelper = + new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS); + /** * States that the RedundantEditLogInputStream can be in. * @@ -174,8 +181,12 @@ class RedundantEditLogInputStream extends EditLogInputStream { case SKIP_UNTIL: try { if (prevTxId != HdfsServerConstants.INVALID_TXID) { - LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() + - "' to transaction ID " + (prevTxId + 1)); + LogAction logAction = fastForwardLoggingHelper.record(); + if (logAction.shouldLog()) { + LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() + + "' to transaction ID " + (prevTxId + 1) + + LogThrottlingHelper.getLogSupressionMessage(logAction)); + } streams[curIdx].skipUntil(prevTxId + 1); } } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index c074ae16816..87b8ee82b56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.BufferedInputStream; import java.io.File; @@ -61,8 +64,10 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; +import org.apache.hadoop.util.FakeTimer; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -101,6 +106,7 @@ public class TestFSEditLogLoader { private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class); private static final int NUM_DATA_NODES = 0; + private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM"; private final ErasureCodingPolicy testECPolicy = StripedFileTestUtil.getDefaultECPolicy(); @@ -799,4 +805,46 @@ public class TestFSEditLogLoader { } } } + + @Test + public void setLoadFSEditLogThrottling() throws Exception { + FSNamesystem namesystem = mock(FSNamesystem.class); + namesystem.dir = mock(FSDirectory.class); + + FakeTimer timer = new FakeTimer(); + FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer); + + LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG); + loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1); + assertTrue(capture.getOutput().contains("Start loading edits file " + + FAKE_EDIT_STREAM_NAME)); + assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)")); + assertFalse(capture.getOutput().contains("suppressed")); + + timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2); + capture.clearOutput(); + loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11); + assertFalse(capture.getOutput().contains("Start loading edits file")); + assertFalse(capture.getOutput().contains("edits file(s)")); + + timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS); + capture.clearOutput(); + loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21); + assertTrue(capture.getOutput().contains("Start loading edits file " + + FAKE_EDIT_STREAM_NAME)); + assertTrue(capture.getOutput().contains("suppressed logging 1 times")); + assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)")); + assertTrue(capture.getOutput().contains("total size 2.0")); + } + + private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx) + throws IOException { + EditLogInputStream fakeStream = mock(EditLogInputStream.class); + when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME); + when(fakeStream.getFirstTxId()).thenReturn(startTx); + when(fakeStream.getLastTxId()).thenReturn(endTx); + when(fakeStream.length()).thenReturn(1L); + return fakeStream; + } + }