HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen.
This commit is contained in:
parent
894f095219
commit
56af83c961
|
@ -272,6 +272,40 @@ public class LogThrottlingHelper {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the summary information for given index.
|
||||
*
|
||||
* @param recorderName The name of the recorder.
|
||||
* @param idx The index value.
|
||||
* @return The summary information.
|
||||
*/
|
||||
public SummaryStatistics getCurrentStats(String recorderName, int idx) {
|
||||
LoggingAction currentLog = currentLogs.get(recorderName);
|
||||
if (currentLog != null) {
|
||||
return currentLog.getStats(idx);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to create a message about how many log statements were
|
||||
* suppressed in the provided log action. If no statements were suppressed,
|
||||
* this returns an empty string. The message has the format (without quotes):
|
||||
*
|
||||
* <p/>' (suppressed logging <i>{suppression_count}</i> times)'
|
||||
*
|
||||
* @param action The log action to produce a message about.
|
||||
* @return A message about suppression within this action.
|
||||
*/
|
||||
public static String getLogSupressionMessage(LogAction action) {
|
||||
if (action.getCount() > 1) {
|
||||
return " (suppressed logging " + (action.getCount() - 1) + " times)";
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A standard log action which keeps track of all of the values which have
|
||||
* been logged. This is also used for internal bookkeeping via its private
|
||||
|
|
|
@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager {
|
|||
private int outputBufferCapacity = 512 * 1024;
|
||||
private final URLConnectionFactory connectionFactory;
|
||||
|
||||
/** Limit logging about input stream selection to every 5 seconds max. */
|
||||
private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
|
||||
private final LogThrottlingHelper selectInputStreamLogHelper =
|
||||
new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);
|
||||
|
||||
@VisibleForTesting
|
||||
public QuorumJournalManager(Configuration conf,
|
||||
URI uri,
|
||||
|
@ -568,8 +575,12 @@ public class QuorumJournalManager implements JournalManager {
|
|||
"ID " + fromTxnId);
|
||||
return;
|
||||
}
|
||||
LOG.info("Selected loggers with >= " + maxAllowedTxns +
|
||||
" transactions starting from " + fromTxnId);
|
||||
LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
|
||||
if (logAction.shouldLog()) {
|
||||
LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
|
||||
"starting from lowest txn ID " + logAction.getStats(0).getMin() +
|
||||
LogThrottlingHelper.getLogSupressionMessage(logAction));
|
||||
}
|
||||
PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
|
||||
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||
for (GetJournaledEditsResponseProto resp : responseMap.values()) {
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -113,27 +112,45 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
|
|||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
||||
import org.apache.hadoop.hdfs.util.Holder;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||
import org.apache.hadoop.util.ChunkedArrayList;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class FSEditLogLoader {
|
||||
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
|
||||
static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
|
||||
|
||||
/** Limit logging about edit loading to every 5 seconds max. */
|
||||
@VisibleForTesting
|
||||
static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
|
||||
private final LogThrottlingHelper loadEditsLogHelper =
|
||||
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
|
||||
|
||||
private final FSNamesystem fsNamesys;
|
||||
private final BlockManager blockManager;
|
||||
private final Timer timer;
|
||||
private long lastAppliedTxId;
|
||||
/** Total number of end transactions loaded. */
|
||||
private int totalEdits = 0;
|
||||
|
||||
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
|
||||
this(fsNamesys, lastAppliedTxId, new Timer());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) {
|
||||
this.fsNamesys = fsNamesys;
|
||||
this.blockManager = fsNamesys.getBlockManager();
|
||||
this.lastAppliedTxId = lastAppliedTxId;
|
||||
this.timer = timer;
|
||||
}
|
||||
|
||||
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
|
||||
|
@ -154,14 +171,26 @@ public class FSEditLogLoader {
|
|||
prog.beginStep(Phase.LOADING_EDITS, step);
|
||||
fsNamesys.writeLock();
|
||||
try {
|
||||
long startTime = monotonicNow();
|
||||
long startTime = timer.monotonicNow();
|
||||
LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
|
||||
if (preLogAction.shouldLog()) {
|
||||
FSImage.LOG.info("Start loading edits file " + edits.getName()
|
||||
+ " maxTxnsToRead = " + maxTxnsToRead);
|
||||
+ " maxTxnsToRead = " + maxTxnsToRead +
|
||||
LogThrottlingHelper.getLogSupressionMessage(preLogAction));
|
||||
}
|
||||
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
|
||||
maxTxnsToRead, startOpt, recovery);
|
||||
FSImage.LOG.info("Edits file " + edits.getName()
|
||||
+ " of size " + edits.length() + " edits # " + numEdits
|
||||
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
|
||||
long endTime = timer.monotonicNow();
|
||||
LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
|
||||
numEdits, edits.length(), endTime - startTime);
|
||||
if (postLogAction.shouldLog()) {
|
||||
FSImage.LOG.info("Loaded " + postLogAction.getCount()
|
||||
+ " edits file(s) (the last named " + edits.getName()
|
||||
+ ") of total size " + postLogAction.getStats(1).getSum()
|
||||
+ ", total edits " + postLogAction.getStats(0).getSum()
|
||||
+ ", total load time " + postLogAction.getStats(2).getSum()
|
||||
+ " ms");
|
||||
}
|
||||
return numEdits;
|
||||
} finally {
|
||||
edits.close();
|
||||
|
@ -202,7 +231,7 @@ public class FSEditLogLoader {
|
|||
Step step = createStartupProgressStep(in);
|
||||
prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
|
||||
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
|
||||
long lastLogTime = monotonicNow();
|
||||
long lastLogTime = timer.monotonicNow();
|
||||
long lastInodeId = fsNamesys.dir.getLastInodeId();
|
||||
|
||||
try {
|
||||
|
@ -282,7 +311,7 @@ public class FSEditLogLoader {
|
|||
}
|
||||
// log progress
|
||||
if (op.hasTransactionId()) {
|
||||
long now = monotonicNow();
|
||||
long now = timer.monotonicNow();
|
||||
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
|
||||
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
|
||||
int percent = Math.round((float) deltaTxId / numTxns * 100);
|
||||
|
|
|
@ -69,6 +69,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|||
import org.apache.hadoop.hdfs.util.Canceler;
|
||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
|
@ -123,6 +125,11 @@ public class FSImage implements Closeable {
|
|||
private final Set<Long> currentlyCheckpointing =
|
||||
Collections.<Long>synchronizedSet(new HashSet<Long>());
|
||||
|
||||
/** Limit logging about edit loading to every 5 seconds max. */
|
||||
private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
|
||||
private final LogThrottlingHelper loadEditLogHelper =
|
||||
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
|
||||
|
||||
/**
|
||||
* Construct an FSImage
|
||||
* @param conf Configuration
|
||||
|
@ -884,8 +891,16 @@ public class FSImage implements Closeable {
|
|||
|
||||
// Load latest edits
|
||||
for (EditLogInputStream editIn : editStreams) {
|
||||
LogAction logAction = loadEditLogHelper.record();
|
||||
if (logAction.shouldLog()) {
|
||||
String logSuppressed = "";
|
||||
if (logAction.getCount() > 1) {
|
||||
logSuppressed = "; suppressed logging for " +
|
||||
(logAction.getCount() - 1) + " edit reads";
|
||||
}
|
||||
LOG.info("Reading " + editIn + " expecting start txid #" +
|
||||
(lastAppliedTxId + 1));
|
||||
(lastAppliedTxId + 1) + logSuppressed);
|
||||
}
|
||||
try {
|
||||
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
|
||||
startOpt, recovery);
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
|
||||
|
||||
/**
|
||||
* A merged input stream that handles failover between different edit logs.
|
||||
|
@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
|
|||
private long prevTxId;
|
||||
private final EditLogInputStream[] streams;
|
||||
|
||||
/** Limit logging about fast forwarding the stream to every 5 seconds max. */
|
||||
private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
|
||||
private final LogThrottlingHelper fastForwardLoggingHelper =
|
||||
new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
|
||||
|
||||
/**
|
||||
* States that the RedundantEditLogInputStream can be in.
|
||||
*
|
||||
|
@ -174,8 +181,12 @@ class RedundantEditLogInputStream extends EditLogInputStream {
|
|||
case SKIP_UNTIL:
|
||||
try {
|
||||
if (prevTxId != HdfsServerConstants.INVALID_TXID) {
|
||||
LogAction logAction = fastForwardLoggingHelper.record();
|
||||
if (logAction.shouldLog()) {
|
||||
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
|
||||
"' to transaction ID " + (prevTxId + 1));
|
||||
"' to transaction ID " + (prevTxId + 1) +
|
||||
LogThrottlingHelper.getLogSupressionMessage(logAction));
|
||||
}
|
||||
streams[curIdx].skipUntil(prevTxId + 1);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
|
@ -61,8 +64,10 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -101,6 +106,7 @@ public class TestFSEditLogLoader {
|
|||
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
|
||||
|
||||
private static final int NUM_DATA_NODES = 0;
|
||||
private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
|
||||
|
||||
private final ErasureCodingPolicy testECPolicy
|
||||
= StripedFileTestUtil.getDefaultECPolicy();
|
||||
|
@ -799,4 +805,46 @@ public class TestFSEditLogLoader {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setLoadFSEditLogThrottling() throws Exception {
|
||||
FSNamesystem namesystem = mock(FSNamesystem.class);
|
||||
namesystem.dir = mock(FSDirectory.class);
|
||||
|
||||
FakeTimer timer = new FakeTimer();
|
||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
|
||||
|
||||
LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
|
||||
loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
|
||||
assertTrue(capture.getOutput().contains("Start loading edits file " +
|
||||
FAKE_EDIT_STREAM_NAME));
|
||||
assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)"));
|
||||
assertFalse(capture.getOutput().contains("suppressed"));
|
||||
|
||||
timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2);
|
||||
capture.clearOutput();
|
||||
loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11);
|
||||
assertFalse(capture.getOutput().contains("Start loading edits file"));
|
||||
assertFalse(capture.getOutput().contains("edits file(s)"));
|
||||
|
||||
timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS);
|
||||
capture.clearOutput();
|
||||
loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21);
|
||||
assertTrue(capture.getOutput().contains("Start loading edits file " +
|
||||
FAKE_EDIT_STREAM_NAME));
|
||||
assertTrue(capture.getOutput().contains("suppressed logging 1 times"));
|
||||
assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)"));
|
||||
assertTrue(capture.getOutput().contains("total size 2.0"));
|
||||
}
|
||||
|
||||
private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx)
|
||||
throws IOException {
|
||||
EditLogInputStream fakeStream = mock(EditLogInputStream.class);
|
||||
when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME);
|
||||
when(fakeStream.getFirstTxId()).thenReturn(startTx);
|
||||
when(fakeStream.getLastTxId()).thenReturn(endTx);
|
||||
when(fakeStream.length()).thenReturn(1L);
|
||||
return fakeStream;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue