HDFS-16872. Fix log throttling by declaring LogThrottlingHelper as static members (#5246)

Co-authored-by: Chengbing Liu <liuchengbing@qiyi.com>
Signed-off-by: Erik Krogen <xkrogen@apache.org>

(cherry picked from commit 4cf304de45)
This commit is contained in:
Chengbing Liu 2023-01-11 02:03:25 +08:00 committed by Erik Krogen
parent a49378cb93
commit af96e0f5b3
4 changed files with 19 additions and 10 deletions

View File

@ -65,7 +65,7 @@ import org.apache.hadoop.util.Timer;
* <p>This class can also be used to coordinate multiple logging points; see
* {@link #record(String, long, double...)} for more details.
*
* <p>This class is not thread-safe.
* <p>This class is thread-safe.
*/
public class LogThrottlingHelper {
@ -192,7 +192,7 @@ public class LogThrottlingHelper {
* @return A LogAction indicating whether or not the caller should write to
* its log.
*/
public LogAction record(double... values) {
public synchronized LogAction record(double... values) {
return record(DEFAULT_RECORDER_NAME, timer.monotonicNow(), values);
}
@ -244,7 +244,7 @@ public class LogThrottlingHelper {
*
* @see #record(double...)
*/
public LogAction record(String recorderName, long currentTimeMs,
public synchronized LogAction record(String recorderName, long currentTimeMs,
double... values) {
if (primaryRecorderName == null) {
primaryRecorderName = recorderName;
@ -287,7 +287,7 @@ public class LogThrottlingHelper {
* @param idx The index value.
* @return The summary information.
*/
public SummaryStatistics getCurrentStats(String recorderName, int idx) {
public synchronized SummaryStatistics getCurrentStats(String recorderName, int idx) {
LoggingAction currentLog = currentLogs.get(recorderName);
if (currentLog != null) {
return currentLog.getStats(idx);
@ -314,6 +314,13 @@ public class LogThrottlingHelper {
}
}
@VisibleForTesting
public synchronized void reset() {
primaryRecorderName = null;
currentLogs.clear();
lastLogTimestampMs = Long.MIN_VALUE;
}
/**
* A standard log action which keeps track of all of the values which have
* been logged. This is also used for internal bookkeeping via its private

View File

@ -132,7 +132,8 @@ public class FSEditLogLoader {
/** Limit logging about edit loading to every 5 seconds max. */
@VisibleForTesting
static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper loadEditsLogHelper =
@VisibleForTesting
static final LogThrottlingHelper LOAD_EDITS_LOG_HELPER =
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
private final FSNamesystem fsNamesys;
@ -173,7 +174,7 @@ public class FSEditLogLoader {
fsNamesys.writeLock();
try {
long startTime = timer.monotonicNow();
LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
LogAction preLogAction = LOAD_EDITS_LOG_HELPER.record("pre", startTime);
if (preLogAction.shouldLog()) {
FSImage.LOG.info("Start loading edits file " + edits.getName()
+ " maxTxnsToRead = " + maxTxnsToRead +
@ -182,7 +183,7 @@ public class FSEditLogLoader {
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
maxTxnsToRead, startOpt, recovery);
long endTime = timer.monotonicNow();
LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
LogAction postLogAction = LOAD_EDITS_LOG_HELPER.record("post", endTime,
numEdits, edits.length(), endTime - startTime);
if (postLogAction.shouldLog()) {
FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +

View File

@ -47,7 +47,7 @@ class RedundantEditLogInputStream extends EditLogInputStream {
/** Limit logging about fast forwarding the stream to every 5 seconds max. */
private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
private final LogThrottlingHelper fastForwardLoggingHelper =
private static final LogThrottlingHelper FAST_FORWARD_LOGGING_HELPER =
new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
/**
@ -182,7 +182,7 @@ class RedundantEditLogInputStream extends EditLogInputStream {
case SKIP_UNTIL:
try {
if (prevTxId != HdfsServerConstants.INVALID_TXID) {
LogAction logAction = fastForwardLoggingHelper.record();
LogAction logAction = FAST_FORWARD_LOGGING_HELPER.record();
if (logAction.shouldLog()) {
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
"' to transaction ID " + (prevTxId + 1) +

View File

@ -807,12 +807,13 @@ public class TestFSEditLogLoader {
}
@Test
public void setLoadFSEditLogThrottling() throws Exception {
public void testLoadFSEditLogThrottling() throws Exception {
FSNamesystem namesystem = mock(FSNamesystem.class);
namesystem.dir = mock(FSDirectory.class);
FakeTimer timer = new FakeTimer();
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
FSEditLogLoader.LOAD_EDITS_LOG_HELPER.reset();
LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);