HDFS-10713. Throttle FsNameSystem lock warnings. Contributed by Hanisha Koneru.
This commit is contained in:
parent
61368c8039
commit
b3c0656e90
|
@ -75,6 +75,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPOR
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
|
||||
|
@ -130,6 +132,8 @@ import java.util.TreeMap;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -284,6 +288,7 @@ import org.apache.hadoop.util.Daemon;
|
|||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.AsyncAppender;
|
||||
|
@ -700,6 +705,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
fsLock = new FSNamesystemLock(fair);
|
||||
cond = fsLock.writeLock().newCondition();
|
||||
cpLock = new ReentrantLock();
|
||||
setTimer(new Timer());
|
||||
|
||||
this.fsImage = fsImage;
|
||||
try {
|
||||
|
@ -817,6 +823,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
||||
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
|
||||
|
||||
this.lockSuppressWarningInterval = conf.getTimeDuration(
|
||||
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
|
||||
// For testing purposes, allow the DT secret manager to be started regardless
|
||||
// of whether security is enabled.
|
||||
alwaysUseDelegationTokensForTests = conf.getBoolean(
|
||||
|
@ -1495,12 +1505,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return Util.stringCollectionAsURIs(dirNames);
|
||||
}
|
||||
|
||||
private final long lockSuppressWarningInterval;
|
||||
/** Threshold (ms) for long holding write lock report. */
|
||||
private long writeLockReportingThreshold;
|
||||
private final long writeLockReportingThreshold;
|
||||
private int numWriteLockWarningsSuppressed = 0;
|
||||
private long timeStampOfLastWriteLockReport = 0;
|
||||
private long longestWriteLockHeldInterval = 0;
|
||||
/** Last time stamp for write lock. Keep the longest one for multi-entrance.*/
|
||||
private long writeLockHeldTimeStamp;
|
||||
/** Threshold (ms) for long holding read lock report. */
|
||||
private long readLockReportingThreshold;
|
||||
private AtomicInteger numReadLockWarningsSuppressed = new AtomicInteger(0);
|
||||
private AtomicLong timeStampOfLastReadLockReport = new AtomicLong(0);
|
||||
private AtomicLong longestReadLockHeldInterval = new AtomicLong(0);
|
||||
private Timer timer;
|
||||
/**
|
||||
* Last time stamp for read lock. Keep the longest one for
|
||||
* multi-entrance. This is ThreadLocal since there could be
|
||||
|
@ -1518,48 +1536,99 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
public void readLock() {
|
||||
this.fsLock.readLock().lock();
|
||||
if (this.fsLock.getReadHoldCount() == 1) {
|
||||
readLockHeldTimeStamp.set(monotonicNow());
|
||||
readLockHeldTimeStamp.set(timer.monotonicNow());
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void readUnlock() {
|
||||
final boolean needReport = this.fsLock.getReadHoldCount() == 1;
|
||||
final long readLockInterval = monotonicNow() - readLockHeldTimeStamp.get();
|
||||
this.fsLock.readLock().unlock();
|
||||
|
||||
final long readLockInterval = timer.monotonicNow() -
|
||||
readLockHeldTimeStamp.get();
|
||||
if (needReport) {
|
||||
readLockHeldTimeStamp.remove();
|
||||
if (readLockInterval > this.readLockReportingThreshold) {
|
||||
LOG.info("FSNamesystem read lock held for " + readLockInterval +
|
||||
" ms via\n" + StringUtils.getStackTrace(Thread.currentThread()));
|
||||
}
|
||||
}
|
||||
|
||||
this.fsLock.readLock().unlock();
|
||||
|
||||
if (needReport && readLockInterval >= this.readLockReportingThreshold) {
|
||||
long localLongestReadLock;
|
||||
do {
|
||||
localLongestReadLock = longestReadLockHeldInterval.get();
|
||||
} while (localLongestReadLock - readLockInterval < 0
|
||||
&& !longestReadLockHeldInterval.compareAndSet(localLongestReadLock,
|
||||
readLockInterval));
|
||||
|
||||
long localTimeStampOfLastReadLockReport;
|
||||
long now;
|
||||
do {
|
||||
now = timer.monotonicNow();
|
||||
localTimeStampOfLastReadLockReport = timeStampOfLastReadLockReport
|
||||
.get();
|
||||
if (now - localTimeStampOfLastReadLockReport <
|
||||
lockSuppressWarningInterval) {
|
||||
numReadLockWarningsSuppressed.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
} while (!timeStampOfLastReadLockReport.compareAndSet(
|
||||
localTimeStampOfLastReadLockReport, now));
|
||||
int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0);
|
||||
long longestLockHeldInterval = longestReadLockHeldInterval.getAndSet(0);
|
||||
LOG.info("FSNamesystem read lock held for " + readLockInterval +
|
||||
" ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) +
|
||||
"\tNumber of suppressed read-lock reports: " +
|
||||
numSuppressedWarnings + "\n\tLongest read-lock held interval: " +
|
||||
longestLockHeldInterval);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void writeLock() {
|
||||
this.fsLock.writeLock().lock();
|
||||
if (fsLock.getWriteHoldCount() == 1) {
|
||||
writeLockHeldTimeStamp = monotonicNow();
|
||||
writeLockHeldTimeStamp = timer.monotonicNow();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void writeLockInterruptibly() throws InterruptedException {
|
||||
this.fsLock.writeLock().lockInterruptibly();
|
||||
if (fsLock.getWriteHoldCount() == 1) {
|
||||
writeLockHeldTimeStamp = monotonicNow();
|
||||
writeLockHeldTimeStamp = timer.monotonicNow();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void writeUnlock() {
|
||||
final boolean needReport = fsLock.getWriteHoldCount() == 1 &&
|
||||
fsLock.isWriteLockedByCurrentThread();
|
||||
final long writeLockInterval = monotonicNow() - writeLockHeldTimeStamp;
|
||||
final long currentTime = timer.monotonicNow();
|
||||
final long writeLockInterval = currentTime - writeLockHeldTimeStamp;
|
||||
|
||||
boolean logReport = false;
|
||||
int numSuppressedWarnings = 0;
|
||||
long longestLockHeldInterval = 0;
|
||||
if (needReport && writeLockInterval >= this.writeLockReportingThreshold) {
|
||||
if (writeLockInterval > longestWriteLockHeldInterval) {
|
||||
longestWriteLockHeldInterval = writeLockInterval;
|
||||
}
|
||||
if (currentTime - timeStampOfLastWriteLockReport > this
|
||||
.lockSuppressWarningInterval) {
|
||||
logReport = true;
|
||||
numSuppressedWarnings = numWriteLockWarningsSuppressed;
|
||||
numWriteLockWarningsSuppressed = 0;
|
||||
longestLockHeldInterval = longestWriteLockHeldInterval;
|
||||
longestWriteLockHeldInterval = 0;
|
||||
timeStampOfLastWriteLockReport = currentTime;
|
||||
} else {
|
||||
numWriteLockWarningsSuppressed++;
|
||||
}
|
||||
}
|
||||
|
||||
this.fsLock.writeLock().unlock();
|
||||
|
||||
if (needReport && writeLockInterval >= this.writeLockReportingThreshold) {
|
||||
if (logReport) {
|
||||
LOG.info("FSNamesystem write lock held for " + writeLockInterval +
|
||||
" ms via\n" + StringUtils.getStackTrace(Thread.currentThread()));
|
||||
" ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) +
|
||||
"\tNumber of suppressed write-lock reports: " +
|
||||
numSuppressedWarnings + "\n\tLongest write-lock held interval: " +
|
||||
longestLockHeldInterval);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
|
@ -6954,5 +7023,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setTimer(Timer newTimer) {
|
||||
this.timer = newTimer;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import static org.hamcrest.CoreMatchers.either;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.*;
|
||||
|
@ -28,7 +29,6 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
@ -58,8 +58,8 @@ import java.util.List;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class TestFSNamesystem {
|
||||
|
@ -295,45 +295,54 @@ public class TestFSNamesystem {
|
|||
@Test(timeout=45000)
|
||||
public void testFSWriteLockLongHoldingReport() throws Exception {
|
||||
final long writeLockReportingThreshold = 100L;
|
||||
final long writeLockSuppressWarningInterval = 10000L;
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
||||
writeLockReportingThreshold);
|
||||
conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||
writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
|
||||
FSImage fsImage = Mockito.mock(FSImage.class);
|
||||
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
|
||||
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
|
||||
final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
||||
|
||||
FakeTimer timer = new FakeTimer();
|
||||
fsn.setTimer(timer);
|
||||
timer.advance(writeLockSuppressWarningInterval);
|
||||
|
||||
LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
|
||||
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
|
||||
|
||||
// Don't report if the write lock is held for a short time
|
||||
fsn.writeLock();
|
||||
Thread.sleep(writeLockReportingThreshold / 2);
|
||||
fsn.writeUnlock();
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
|
||||
|
||||
// Report if the write lock is held for a long time
|
||||
// Report the first write lock warning if it is held for a long time
|
||||
fsn.writeLock();
|
||||
Thread.sleep(writeLockReportingThreshold + 10);
|
||||
timer.advance(writeLockReportingThreshold + 10);
|
||||
logs.clearOutput();
|
||||
fsn.writeUnlock();
|
||||
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
|
||||
// Report if the write lock is held (interruptibly) for a long time
|
||||
// Track but do not Report if the write lock is held (interruptibly) for
|
||||
// a long time but time since last report does not exceed the suppress
|
||||
// warning interval
|
||||
fsn.writeLockInterruptibly();
|
||||
Thread.sleep(writeLockReportingThreshold + 10);
|
||||
timer.advance(writeLockReportingThreshold + 10);
|
||||
logs.clearOutput();
|
||||
fsn.writeUnlock();
|
||||
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
|
||||
// Report if it's held for a long time when re-entering write lock
|
||||
// Track but do not Report if it's held for a long time when re-entering
|
||||
// write lock but time since last report does not exceed the suppress
|
||||
// warning interval
|
||||
fsn.writeLock();
|
||||
Thread.sleep(writeLockReportingThreshold/ 2 + 1);
|
||||
timer.advance(writeLockReportingThreshold/ 2 + 1);
|
||||
fsn.writeLockInterruptibly();
|
||||
Thread.sleep(writeLockReportingThreshold / 2 + 1);
|
||||
timer.advance(writeLockReportingThreshold/ 2 + 1);
|
||||
fsn.writeLock();
|
||||
Thread.sleep(writeLockReportingThreshold / 2);
|
||||
timer.advance(writeLockReportingThreshold/ 2);
|
||||
logs.clearOutput();
|
||||
fsn.writeUnlock();
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
|
@ -342,7 +351,18 @@ public class TestFSNamesystem {
|
|||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
logs.clearOutput();
|
||||
fsn.writeUnlock();
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
|
||||
// Report if it's held for a long time and time since last report exceeds
|
||||
// the supress warning interval
|
||||
timer.advance(writeLockSuppressWarningInterval);
|
||||
fsn.writeLock();
|
||||
timer.advance(writeLockReportingThreshold + 100);
|
||||
logs.clearOutput();
|
||||
fsn.writeUnlock();
|
||||
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
||||
assertTrue(logs.getOutput().contains("Number of suppressed write-lock " +
|
||||
"reports: 2"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -352,52 +372,71 @@ public class TestFSNamesystem {
|
|||
@Test(timeout=45000)
|
||||
public void testFSReadLockLongHoldingReport() throws Exception {
|
||||
final long readLockReportingThreshold = 100L;
|
||||
final long readLockSuppressWarningInterval = 10000L;
|
||||
final String readLockLogStmt = "FSNamesystem read lock held for ";
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
||||
readLockReportingThreshold);
|
||||
conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||
readLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
|
||||
FSImage fsImage = Mockito.mock(FSImage.class);
|
||||
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
|
||||
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
|
||||
final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
||||
|
||||
final FakeTimer timer = new FakeTimer();
|
||||
fsn.setTimer(timer);
|
||||
timer.advance(readLockSuppressWarningInterval);
|
||||
|
||||
LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
|
||||
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
|
||||
|
||||
// Don't report if the read lock is held for a short time
|
||||
fsn.readLock();
|
||||
Thread.sleep(readLockReportingThreshold / 2);
|
||||
fsn.readUnlock();
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
|
||||
logs.getOutput().contains(readLockLogStmt));
|
||||
|
||||
// Report if the read lock is held for a long time
|
||||
// Report the first read lock warning if it is held for a long time
|
||||
fsn.readLock();
|
||||
Thread.sleep(readLockReportingThreshold + 10);
|
||||
timer.advance(readLockReportingThreshold + 10);
|
||||
logs.clearOutput();
|
||||
fsn.readUnlock();
|
||||
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())
|
||||
&& logs.getOutput().contains(readLockLogStmt));
|
||||
|
||||
// Report if it's held for a long time when re-entering read lock
|
||||
// Track but do not Report if the write lock is held for a long time but
|
||||
// time since last report does not exceed the suppress warning interval
|
||||
fsn.readLock();
|
||||
Thread.sleep(readLockReportingThreshold / 2 + 1);
|
||||
timer.advance(readLockReportingThreshold + 10);
|
||||
logs.clearOutput();
|
||||
fsn.readUnlock();
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())
|
||||
&& logs.getOutput().contains(readLockLogStmt));
|
||||
|
||||
// Track but do not Report if it's held for a long time when re-entering
|
||||
// read lock but time since last report does not exceed the suppress
|
||||
// warning interval
|
||||
fsn.readLock();
|
||||
Thread.sleep(readLockReportingThreshold / 2 + 1);
|
||||
timer.advance(readLockReportingThreshold / 2 + 1);
|
||||
fsn.readLock();
|
||||
timer.advance(readLockReportingThreshold / 2 + 1);
|
||||
logs.clearOutput();
|
||||
fsn.readUnlock();
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) ||
|
||||
logs.getOutput().contains(readLockLogStmt));
|
||||
logs.clearOutput();
|
||||
fsn.readUnlock();
|
||||
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
|
||||
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
|
||||
logs.getOutput().contains(readLockLogStmt));
|
||||
|
||||
// Report if it's held for a long time while another thread also has the
|
||||
// Report if it's held for a long time (and time since last report
|
||||
// exceeds the suppress warning interval) while another thread also has the
|
||||
// read lock. Let one thread hold the lock long enough to activate an
|
||||
// alert, then have another thread grab the read lock to ensure that this
|
||||
// doesn't reset the timing.
|
||||
timer.advance(readLockSuppressWarningInterval);
|
||||
logs.clearOutput();
|
||||
final CountDownLatch barrier = new CountDownLatch(1);
|
||||
final CountDownLatch barrier2 = new CountDownLatch(1);
|
||||
|
@ -406,7 +445,7 @@ public class TestFSNamesystem {
|
|||
public void run() {
|
||||
try {
|
||||
fsn.readLock();
|
||||
Thread.sleep(readLockReportingThreshold + 1);
|
||||
timer.advance(readLockReportingThreshold + 1);
|
||||
barrier.countDown(); // Allow for t2 to acquire the read lock
|
||||
barrier2.await(); // Wait until t2 has the read lock
|
||||
fsn.readUnlock();
|
||||
|
@ -441,6 +480,8 @@ public class TestFSNamesystem {
|
|||
Pattern t2Pattern = Pattern.compile(
|
||||
String.format(stackTracePatternString, t2.getClass().getName()));
|
||||
assertFalse(t2Pattern.matcher(logs.getOutput()).find());
|
||||
assertTrue(logs.getOutput().contains("Number of suppressed read-lock " +
|
||||
"reports: 2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue