diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
index aa4e61c570b..591c3fb8a0b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
@@ -272,6 +272,40 @@ public class LogThrottlingHelper {
}
}
+ /**
+ * Return the summary information for given index.
+ *
+ * @param recorderName The name of the recorder.
+ * @param idx The index value.
+ * @return The summary information.
+ */
+ public SummaryStatistics getCurrentStats(String recorderName, int idx) {
+ LoggingAction currentLog = currentLogs.get(recorderName);
+ if (currentLog != null) {
+ return currentLog.getStats(idx);
+ }
+
+ return null;
+ }
+
+ /**
+ * Helper function to create a message about how many log statements were
+ * suppressed in the provided log action. If no statements were suppressed,
+ * this returns an empty string. The message has the format (without quotes):
+ *
+ *
' (suppressed logging {suppression_count} times)'
+ *
+ * @param action The log action to produce a message about.
+ * @return A message about suppression within this action.
+ */
+ public static String getLogSupressionMessage(LogAction action) {
+ if (action.getCount() > 1) {
+ return " (suppressed logging " + (action.getCount() - 1) + " times)";
+ } else {
+ return "";
+ }
+ }
+
/**
* A standard log action which keeps track of all of the values which have
* been logged. This is also used for internal bookkeeping via its private
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index f4177e378ff..5f6d45f8117 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager {
private int outputBufferCapacity = 512 * 1024;
private final URLConnectionFactory connectionFactory;
+ /** Limit logging about input stream selection to every 5 seconds max. */
+ private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper selectInputStreamLogHelper =
+ new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);
+
@VisibleForTesting
public QuorumJournalManager(Configuration conf,
URI uri,
@@ -568,8 +575,12 @@ public class QuorumJournalManager implements JournalManager {
"ID " + fromTxnId);
return;
}
- LOG.info("Selected loggers with >= " + maxAllowedTxns +
- " transactions starting from " + fromTxnId);
+ LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
+ if (logAction.shouldLog()) {
+ LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
+ "starting from lowest txn ID " + logAction.getStats(0).getMin() +
+ LogThrottlingHelper.getLogSupressionMessage(logAction));
+ }
PriorityQueue allStreams = new PriorityQueue<>(
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (GetJournaledEditsResponseProto resp : responseMap.values()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 82e35bd353e..0d86b76652b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
-import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -113,27 +112,45 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.util.ChunkedArrayList;
+import org.apache.hadoop.util.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
+
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLogLoader {
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
+ /** Limit logging about edit loading to every 5 seconds max. */
+ @VisibleForTesting
+ static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper loadEditsLogHelper =
+ new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
private final FSNamesystem fsNamesys;
private final BlockManager blockManager;
+ private final Timer timer;
private long lastAppliedTxId;
/** Total number of end transactions loaded. */
private int totalEdits = 0;
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
+ this(fsNamesys, lastAppliedTxId, new Timer());
+ }
+
+ @VisibleForTesting
+ FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) {
this.fsNamesys = fsNamesys;
this.blockManager = fsNamesys.getBlockManager();
this.lastAppliedTxId = lastAppliedTxId;
+ this.timer = timer;
}
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
@@ -154,14 +171,26 @@ public class FSEditLogLoader {
prog.beginStep(Phase.LOADING_EDITS, step);
fsNamesys.writeLock();
try {
- long startTime = monotonicNow();
- FSImage.LOG.info("Start loading edits file " + edits.getName()
- + " maxTxnsToRead = " + maxTxnsToRead);
+ long startTime = timer.monotonicNow();
+ LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
+ if (preLogAction.shouldLog()) {
+ FSImage.LOG.info("Start loading edits file " + edits.getName()
+ + " maxTxnsToRead = " + maxTxnsToRead +
+ LogThrottlingHelper.getLogSupressionMessage(preLogAction));
+ }
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
maxTxnsToRead, startOpt, recovery);
- FSImage.LOG.info("Edits file " + edits.getName()
- + " of size " + edits.length() + " edits # " + numEdits
- + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
+ long endTime = timer.monotonicNow();
+ LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
+ numEdits, edits.length(), endTime - startTime);
+ if (postLogAction.shouldLog()) {
+ FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +
+ "total size {}, total edits {}, total load time {} ms",
+ postLogAction.getCount(), edits.getName(),
+ postLogAction.getStats(1).getSum(),
+ postLogAction.getStats(0).getSum(),
+ postLogAction.getStats(2).getSum());
+ }
return numEdits;
} finally {
edits.close();
@@ -202,7 +231,7 @@ public class FSEditLogLoader {
Step step = createStartupProgressStep(in);
prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
- long lastLogTime = monotonicNow();
+ long lastLogTime = timer.monotonicNow();
long lastInodeId = fsNamesys.dir.getLastInodeId();
try {
@@ -282,7 +311,7 @@ public class FSEditLogLoader {
}
// log progress
if (op.hasTransactionId()) {
- long now = monotonicNow();
+ long now = timer.monotonicNow();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
int percent = Math.round((float) deltaTxId / numTxns * 100);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 5cfc0176f1d..041a5ccb8de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
@@ -123,6 +125,11 @@ public class FSImage implements Closeable {
private final Set currentlyCheckpointing =
Collections.synchronizedSet(new HashSet());
+ /** Limit logging about edit loading to every 5 seconds max. */
+ private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper loadEditLogHelper =
+ new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
/**
* Construct an FSImage
* @param conf Configuration
@@ -884,8 +891,16 @@ public class FSImage implements Closeable {
// Load latest edits
for (EditLogInputStream editIn : editStreams) {
- LOG.info("Reading " + editIn + " expecting start txid #" +
- (lastAppliedTxId + 1));
+ LogAction logAction = loadEditLogHelper.record();
+ if (logAction.shouldLog()) {
+ String logSuppressed = "";
+ if (logAction.getCount() > 1) {
+ logSuppressed = "; suppressed logging for " +
+ (logAction.getCount() - 1) + " edit reads";
+ }
+ LOG.info("Reading " + editIn + " expecting start txid #" +
+ (lastAppliedTxId + 1) + logSuppressed);
+ }
try {
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
startOpt, recovery);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
index a73206b31ef..74bf39a73ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
/**
* A merged input stream that handles failover between different edit logs.
@@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
private long prevTxId;
private final EditLogInputStream[] streams;
+ /** Limit logging about fast forwarding the stream to every 5 seconds max. */
+ private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper fastForwardLoggingHelper =
+ new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
+
/**
* States that the RedundantEditLogInputStream can be in.
*
@@ -174,8 +181,12 @@ class RedundantEditLogInputStream extends EditLogInputStream {
case SKIP_UNTIL:
try {
if (prevTxId != HdfsServerConstants.INVALID_TXID) {
- LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
- "' to transaction ID " + (prevTxId + 1));
+ LogAction logAction = fastForwardLoggingHelper.record();
+ if (logAction.shouldLog()) {
+ LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
+ "' to transaction ID " + (prevTxId + 1) +
+ LogThrottlingHelper.getLogSupressionMessage(logAction));
+ }
streams[curIdx].skipUntil(prevTxId + 1);
}
} catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index c074ae16816..87b8ee82b56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.BufferedInputStream;
import java.io.File;
@@ -61,8 +64,10 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Level;
+import org.apache.hadoop.util.FakeTimer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -101,6 +106,7 @@ public class TestFSEditLogLoader {
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
private static final int NUM_DATA_NODES = 0;
+ private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
private final ErasureCodingPolicy testECPolicy
= StripedFileTestUtil.getDefaultECPolicy();
@@ -799,4 +805,46 @@ public class TestFSEditLogLoader {
}
}
}
+
+ @Test
+ public void setLoadFSEditLogThrottling() throws Exception {
+ FSNamesystem namesystem = mock(FSNamesystem.class);
+ namesystem.dir = mock(FSDirectory.class);
+
+ FakeTimer timer = new FakeTimer();
+ FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
+
+ LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
+ loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
+ assertTrue(capture.getOutput().contains("Start loading edits file " +
+ FAKE_EDIT_STREAM_NAME));
+ assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)"));
+ assertFalse(capture.getOutput().contains("suppressed"));
+
+ timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2);
+ capture.clearOutput();
+ loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11);
+ assertFalse(capture.getOutput().contains("Start loading edits file"));
+ assertFalse(capture.getOutput().contains("edits file(s)"));
+
+ timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS);
+ capture.clearOutput();
+ loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21);
+ assertTrue(capture.getOutput().contains("Start loading edits file " +
+ FAKE_EDIT_STREAM_NAME));
+ assertTrue(capture.getOutput().contains("suppressed logging 1 times"));
+ assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)"));
+ assertTrue(capture.getOutput().contains("total size 2.0"));
+ }
+
+ private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx)
+ throws IOException {
+ EditLogInputStream fakeStream = mock(EditLogInputStream.class);
+ when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME);
+ when(fakeStream.getFirstTxId()).thenReturn(startTx);
+ when(fakeStream.getLastTxId()).thenReturn(endTx);
+ when(fakeStream.length()).thenReturn(1L);
+ return fakeStream;
+ }
+
}