HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock. Contributed by Erik Krogen.

(cherry-picked from commit 434c5ea75d)
This commit is contained in:
Zhe Zhang 2016-09-30 13:15:59 -07:00
parent a6863e5334
commit dfca0f4abb
4 changed files with 496 additions and 447 deletions

View File

@ -71,12 +71,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
@ -132,8 +126,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -288,7 +280,6 @@
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
@ -700,12 +691,9 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog();
}
boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
LOG.info("fsLock is fair:" + fair);
fsLock = new FSNamesystemLock(fair);
cond = fsLock.writeLock().newCondition();
fsLock = new FSNamesystemLock(conf);
cond = fsLock.newWriteLockCondition();
cpLock = new ReentrantLock();
setTimer(new Timer());
this.fsImage = fsImage;
try {
@ -816,17 +804,6 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_KEY,
DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_DEFAULT);
this.writeLockReportingThreshold = conf.getLong(
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
this.readLockReportingThreshold = conf.getLong(
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
this.lockSuppressWarningInterval = conf.getTimeDuration(
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean(
@ -1505,131 +1482,25 @@ public static List<URI> getSharedEditsDirs(Configuration conf) {
return Util.stringCollectionAsURIs(dirNames);
}
private final long lockSuppressWarningInterval;
/** Threshold (ms) for long holding write lock report. */
private final long writeLockReportingThreshold;
private int numWriteLockWarningsSuppressed = 0;
private long timeStampOfLastWriteLockReport = 0;
private long longestWriteLockHeldInterval = 0;
/** Last time stamp for write lock. Keep the longest one for multi-entrance.*/
private long writeLockHeldTimeStamp;
/** Threshold (ms) for long holding read lock report. */
private long readLockReportingThreshold;
private AtomicInteger numReadLockWarningsSuppressed = new AtomicInteger(0);
private AtomicLong timeStampOfLastReadLockReport = new AtomicLong(0);
private AtomicLong longestReadLockHeldInterval = new AtomicLong(0);
private Timer timer;
/**
* Last time stamp for read lock. Keep the longest one for
* multi-entrance. This is ThreadLocal since there could be
* many read locks held simultaneously.
*/
private static ThreadLocal<Long> readLockHeldTimeStamp =
new ThreadLocal<Long>() {
@Override
public Long initialValue() {
return Long.MAX_VALUE;
}
};
@Override
public void readLock() {
this.fsLock.readLock().lock();
if (this.fsLock.getReadHoldCount() == 1) {
readLockHeldTimeStamp.set(timer.monotonicNow());
}
this.fsLock.readLock();
}
@Override
public void readUnlock() {
final boolean needReport = this.fsLock.getReadHoldCount() == 1;
final long readLockInterval = timer.monotonicNow() -
readLockHeldTimeStamp.get();
if (needReport) {
readLockHeldTimeStamp.remove();
}
this.fsLock.readLock().unlock();
if (needReport && readLockInterval >= this.readLockReportingThreshold) {
long localLongestReadLock;
do {
localLongestReadLock = longestReadLockHeldInterval.get();
} while (localLongestReadLock - readLockInterval < 0
&& !longestReadLockHeldInterval.compareAndSet(localLongestReadLock,
readLockInterval));
long localTimeStampOfLastReadLockReport;
long now;
do {
now = timer.monotonicNow();
localTimeStampOfLastReadLockReport = timeStampOfLastReadLockReport
.get();
if (now - localTimeStampOfLastReadLockReport <
lockSuppressWarningInterval) {
numReadLockWarningsSuppressed.incrementAndGet();
return;
}
} while (!timeStampOfLastReadLockReport.compareAndSet(
localTimeStampOfLastReadLockReport, now));
int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0);
long longestLockHeldInterval = longestReadLockHeldInterval.getAndSet(0);
LOG.info("FSNamesystem read lock held for " + readLockInterval +
" ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) +
"\tNumber of suppressed read-lock reports: " +
numSuppressedWarnings + "\n\tLongest read-lock held interval: " +
longestLockHeldInterval);
}
this.fsLock.readUnlock();
}
@Override
public void writeLock() {
this.fsLock.writeLock().lock();
if (fsLock.getWriteHoldCount() == 1) {
writeLockHeldTimeStamp = timer.monotonicNow();
}
this.fsLock.writeLock();
}
@Override
public void writeLockInterruptibly() throws InterruptedException {
this.fsLock.writeLock().lockInterruptibly();
if (fsLock.getWriteHoldCount() == 1) {
writeLockHeldTimeStamp = timer.monotonicNow();
}
this.fsLock.writeLockInterruptibly();
}
@Override
public void writeUnlock() {
final boolean needReport = fsLock.getWriteHoldCount() == 1 &&
fsLock.isWriteLockedByCurrentThread();
final long currentTime = timer.monotonicNow();
final long writeLockInterval = currentTime - writeLockHeldTimeStamp;
boolean logReport = false;
int numSuppressedWarnings = 0;
long longestLockHeldInterval = 0;
if (needReport && writeLockInterval >= this.writeLockReportingThreshold) {
if (writeLockInterval > longestWriteLockHeldInterval) {
longestWriteLockHeldInterval = writeLockInterval;
}
if (currentTime - timeStampOfLastWriteLockReport > this
.lockSuppressWarningInterval) {
logReport = true;
numSuppressedWarnings = numWriteLockWarningsSuppressed;
numWriteLockWarningsSuppressed = 0;
longestLockHeldInterval = longestWriteLockHeldInterval;
longestWriteLockHeldInterval = 0;
timeStampOfLastWriteLockReport = currentTime;
} else {
numWriteLockWarningsSuppressed++;
}
}
this.fsLock.writeLock().unlock();
if (logReport) {
LOG.info("FSNamesystem write lock held for " + writeLockInterval +
" ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) +
"\tNumber of suppressed write-lock reports: " +
numSuppressedWarnings + "\n\tLongest write-lock held interval: " +
longestLockHeldInterval);
}
this.fsLock.writeUnlock();
}
@Override
public boolean hasWriteLock() {
@ -7022,9 +6893,5 @@ public int getNumEnteringMaintenanceDataNodes() {
.size();
}
@VisibleForTesting
void setTimer(Timer newTimer) {
this.timer = newTimer;
}
}

View File

@ -19,33 +19,186 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Timer;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
/**
* Mimics a ReentrantReadWriteLock so more sophisticated locking capabilities
* are possible.
* Mimics a ReentrantReadWriteLock but does not directly implement the interface
* so more sophisticated locking capabilities and logging/metrics are possible.
*/
class FSNamesystemLock implements ReadWriteLock {
class FSNamesystemLock {
@VisibleForTesting
protected ReentrantReadWriteLock coarseLock;
FSNamesystemLock(boolean fair) {
private final Timer timer;
/**
* Log statements about long lock hold times will not be produced more
* frequently than this interval.
*/
private final long lockSuppressWarningInterval;
/** Threshold (ms) for long holding write lock report. */
private final long writeLockReportingThreshold;
/** Last time stamp for write lock. Keep the longest one for multi-entrance.*/
private long writeLockHeldTimeStamp;
private int numWriteLockWarningsSuppressed = 0;
private long timeStampOfLastWriteLockReport = 0;
private long longestWriteLockHeldInterval = 0;
/** Threshold (ms) for long holding read lock report. */
private final long readLockReportingThreshold;
/**
* Last time stamp for read lock. Keep the longest one for
* multi-entrance. This is ThreadLocal since there could be
* many read locks held simultaneously.
*/
private final ThreadLocal<Long> readLockHeldTimeStamp =
new ThreadLocal<Long>() {
@Override
public Long initialValue() {
return Long.MAX_VALUE;
}
};
private final AtomicInteger numReadLockWarningsSuppressed =
new AtomicInteger(0);
private final AtomicLong timeStampOfLastReadLockReport = new AtomicLong(0);
private final AtomicLong longestReadLockHeldInterval = new AtomicLong(0);
FSNamesystemLock(Configuration conf) {
this(conf, new Timer());
}
@VisibleForTesting
FSNamesystemLock(Configuration conf, Timer timer) {
boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
FSNamesystem.LOG.info("fsLock is fair: " + fair);
this.coarseLock = new ReentrantReadWriteLock(fair);
this.timer = timer;
this.writeLockReportingThreshold = conf.getLong(
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
this.readLockReportingThreshold = conf.getLong(
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
this.lockSuppressWarningInterval = conf.getTimeDuration(
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
}
public void readLock() {
coarseLock.readLock().lock();
if (coarseLock.getReadHoldCount() == 1) {
readLockHeldTimeStamp.set(timer.monotonicNow());
}
}
public void readUnlock() {
final boolean needReport = coarseLock.getReadHoldCount() == 1;
final long readLockInterval =
timer.monotonicNow() - readLockHeldTimeStamp.get();
coarseLock.readLock().unlock();
if (needReport) {
readLockHeldTimeStamp.remove();
}
if (needReport && readLockInterval >= this.readLockReportingThreshold) {
long localLongestReadLock;
do {
localLongestReadLock = longestReadLockHeldInterval.get();
} while (localLongestReadLock - readLockInterval < 0 &&
!longestReadLockHeldInterval.compareAndSet(localLongestReadLock,
readLockInterval));
long localTimeStampOfLastReadLockReport;
long now;
do {
now = timer.monotonicNow();
localTimeStampOfLastReadLockReport =
timeStampOfLastReadLockReport.get();
if (now - localTimeStampOfLastReadLockReport <
lockSuppressWarningInterval) {
numReadLockWarningsSuppressed.incrementAndGet();
return;
}
} while (!timeStampOfLastReadLockReport.compareAndSet(
localTimeStampOfLastReadLockReport, now));
int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0);
long longestLockHeldInterval = longestReadLockHeldInterval.getAndSet(0);
FSNamesystem.LOG.info("FSNamesystem read lock held for " +
readLockInterval + " ms via\n" +
StringUtils.getStackTrace(Thread.currentThread()) +
"\tNumber of suppressed read-lock reports: " + numSuppressedWarnings +
"\n\tLongest read-lock held interval: " + longestLockHeldInterval);
}
}
@Override
public Lock readLock() {
return coarseLock.readLock();
public void writeLock() {
coarseLock.writeLock().lock();
if (coarseLock.getWriteHoldCount() == 1) {
writeLockHeldTimeStamp = timer.monotonicNow();
}
}
@Override
public Lock writeLock() {
return coarseLock.writeLock();
public void writeLockInterruptibly() throws InterruptedException {
coarseLock.writeLock().lockInterruptibly();
if (coarseLock.getWriteHoldCount() == 1) {
writeLockHeldTimeStamp = timer.monotonicNow();
}
}
public void writeUnlock() {
final boolean needReport = coarseLock.getWriteHoldCount() == 1 &&
coarseLock.isWriteLockedByCurrentThread();
final long currentTime = timer.monotonicNow();
final long writeLockInterval = currentTime - writeLockHeldTimeStamp;
boolean logReport = false;
int numSuppressedWarnings = 0;
long longestLockHeldInterval = 0;
if (needReport && writeLockInterval >= this.writeLockReportingThreshold) {
if (writeLockInterval > longestWriteLockHeldInterval) {
longestWriteLockHeldInterval = writeLockInterval;
}
if (currentTime - timeStampOfLastWriteLockReport >
this.lockSuppressWarningInterval) {
logReport = true;
numSuppressedWarnings = numWriteLockWarningsSuppressed;
numWriteLockWarningsSuppressed = 0;
longestLockHeldInterval = longestWriteLockHeldInterval;
longestWriteLockHeldInterval = 0;
timeStampOfLastWriteLockReport = currentTime;
} else {
numWriteLockWarningsSuppressed++;
}
}
coarseLock.writeLock().unlock();
if (logReport) {
FSNamesystem.LOG.info("FSNamesystem write lock held for " +
writeLockInterval + " ms via\n" +
StringUtils.getStackTrace(Thread.currentThread()) +
"\tNumber of suppressed write-lock reports: " +
numSuppressedWarnings + "\n\tLongest write-lock held interval: " +
longestLockHeldInterval);
}
}
public int getReadHoldCount() {
@ -60,6 +213,10 @@ public boolean isWriteLockedByCurrentThread() {
return coarseLock.isWriteLockedByCurrentThread();
}
public Condition newWriteLockCondition() {
return coarseLock.writeLock().newCondition();
}
/**
* Returns the QueueLength of waiting threads.
*

View File

@ -20,7 +20,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import org.apache.hadoop.util.FakeTimer;
import static org.hamcrest.CoreMatchers.either;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.*;
@ -31,7 +30,6 @@
import java.net.URI;
import java.util.Collection;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
@ -45,22 +43,12 @@
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
public class TestFSNamesystem {
@ -165,59 +153,6 @@ public void testReplQueuesActiveAfterStartupSafemode() throws IOException, Inter
assertTrue("Replication queues weren't being populated after entering "
+ "safemode 2nd time", bm.isPopulatingReplQueues());
}
@Test
public void testFsLockFairness() throws IOException, InterruptedException{
Configuration conf = new Configuration();
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
FSImage fsImage = Mockito.mock(FSImage.class);
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
conf.setBoolean("dfs.namenode.fslock.fair", true);
FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage);
assertTrue(fsNamesystem.getFsLockForTests().isFair());
conf.setBoolean("dfs.namenode.fslock.fair", false);
fsNamesystem = new FSNamesystem(conf, fsImage);
assertFalse(fsNamesystem.getFsLockForTests().isFair());
}
@Test
public void testFSNamesystemLockCompatibility() {
FSNamesystemLock rwLock = new FSNamesystemLock(true);
assertEquals(0, rwLock.getReadHoldCount());
rwLock.readLock().lock();
assertEquals(1, rwLock.getReadHoldCount());
rwLock.readLock().lock();
assertEquals(2, rwLock.getReadHoldCount());
rwLock.readLock().unlock();
assertEquals(1, rwLock.getReadHoldCount());
rwLock.readLock().unlock();
assertEquals(0, rwLock.getReadHoldCount());
assertFalse(rwLock.isWriteLockedByCurrentThread());
assertEquals(0, rwLock.getWriteHoldCount());
rwLock.writeLock().lock();
assertTrue(rwLock.isWriteLockedByCurrentThread());
assertEquals(1, rwLock.getWriteHoldCount());
rwLock.writeLock().lock();
assertTrue(rwLock.isWriteLockedByCurrentThread());
assertEquals(2, rwLock.getWriteHoldCount());
rwLock.writeLock().unlock();
assertTrue(rwLock.isWriteLockedByCurrentThread());
assertEquals(1, rwLock.getWriteHoldCount());
rwLock.writeLock().unlock();
assertFalse(rwLock.isWriteLockedByCurrentThread());
assertEquals(0, rwLock.getWriteHoldCount());
}
@Test
public void testReset() throws Exception {
@ -257,233 +192,6 @@ public void testGetEffectiveLayoutVersion() {
FSNamesystem.getEffectiveLayoutVersion(false, -63, -61, -63));
}
@Test
public void testFSLockGetWaiterCount() throws InterruptedException {
final int threadCount = 3;
final CountDownLatch latch = new CountDownLatch(threadCount);
final FSNamesystemLock rwLock = new FSNamesystemLock(true);
rwLock.writeLock().lock();
ExecutorService helper = Executors.newFixedThreadPool(threadCount);
for (int x = 0; x < threadCount; x++) {
helper.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
rwLock.readLock().lock();
}
});
}
latch.await();
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (threadCount == rwLock.getQueueLength());
}
}, 10, 1000);
} catch (TimeoutException e) {
fail("Expected number of blocked thread not found");
}
}
/**
* Test when FSNamesystem write lock is held for a long time,
* logger will report it.
*/
@Test(timeout=45000)
public void testFSWriteLockLongHoldingReport() throws Exception {
final long writeLockReportingThreshold = 100L;
final long writeLockSuppressWarningInterval = 10000L;
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
writeLockReportingThreshold);
conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
FSImage fsImage = Mockito.mock(FSImage.class);
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
FakeTimer timer = new FakeTimer();
fsn.setTimer(timer);
timer.advance(writeLockSuppressWarningInterval);
LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
// Don't report if the write lock is held for a short time
fsn.writeLock();
fsn.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Report the first write lock warning if it is held for a long time
fsn.writeLock();
timer.advance(writeLockReportingThreshold + 10);
logs.clearOutput();
fsn.writeUnlock();
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Track but do not Report if the write lock is held (interruptibly) for
// a long time but time since last report does not exceed the suppress
// warning interval
fsn.writeLockInterruptibly();
timer.advance(writeLockReportingThreshold + 10);
logs.clearOutput();
fsn.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Track but do not Report if it's held for a long time when re-entering
// write lock but time since last report does not exceed the suppress
// warning interval
fsn.writeLock();
timer.advance(writeLockReportingThreshold/ 2 + 1);
fsn.writeLockInterruptibly();
timer.advance(writeLockReportingThreshold/ 2 + 1);
fsn.writeLock();
timer.advance(writeLockReportingThreshold/ 2);
logs.clearOutput();
fsn.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
logs.clearOutput();
fsn.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
logs.clearOutput();
fsn.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Report if it's held for a long time and time since last report exceeds
// the supress warning interval
timer.advance(writeLockSuppressWarningInterval);
fsn.writeLock();
timer.advance(writeLockReportingThreshold + 100);
logs.clearOutput();
fsn.writeUnlock();
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
assertTrue(logs.getOutput().contains("Number of suppressed write-lock " +
"reports: 2"));
}
/**
* Test when FSNamesystem read lock is held for a long time,
* logger will report it.
*/
@Test(timeout=45000)
public void testFSReadLockLongHoldingReport() throws Exception {
final long readLockReportingThreshold = 100L;
final long readLockSuppressWarningInterval = 10000L;
final String readLockLogStmt = "FSNamesystem read lock held for ";
Configuration conf = new Configuration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
readLockReportingThreshold);
conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
readLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
FSImage fsImage = Mockito.mock(FSImage.class);
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
final FakeTimer timer = new FakeTimer();
fsn.setTimer(timer);
timer.advance(readLockSuppressWarningInterval);
LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
// Don't report if the read lock is held for a short time
fsn.readLock();
fsn.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
logs.getOutput().contains(readLockLogStmt));
// Report the first read lock warning if it is held for a long time
fsn.readLock();
timer.advance(readLockReportingThreshold + 10);
logs.clearOutput();
fsn.readUnlock();
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())
&& logs.getOutput().contains(readLockLogStmt));
// Track but do not Report if the write lock is held for a long time but
// time since last report does not exceed the suppress warning interval
fsn.readLock();
timer.advance(readLockReportingThreshold + 10);
logs.clearOutput();
fsn.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())
&& logs.getOutput().contains(readLockLogStmt));
// Track but do not Report if it's held for a long time when re-entering
// read lock but time since last report does not exceed the suppress
// warning interval
fsn.readLock();
timer.advance(readLockReportingThreshold / 2 + 1);
fsn.readLock();
timer.advance(readLockReportingThreshold / 2 + 1);
logs.clearOutput();
fsn.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) ||
logs.getOutput().contains(readLockLogStmt));
logs.clearOutput();
fsn.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
logs.getOutput().contains(readLockLogStmt));
// Report if it's held for a long time (and time since last report
// exceeds the suppress warning interval) while another thread also has the
// read lock. Let one thread hold the lock long enough to activate an
// alert, then have another thread grab the read lock to ensure that this
// doesn't reset the timing.
timer.advance(readLockSuppressWarningInterval);
logs.clearOutput();
final CountDownLatch barrier = new CountDownLatch(1);
final CountDownLatch barrier2 = new CountDownLatch(1);
Thread t1 = new Thread() {
@Override
public void run() {
try {
fsn.readLock();
timer.advance(readLockReportingThreshold + 1);
barrier.countDown(); // Allow for t2 to acquire the read lock
barrier2.await(); // Wait until t2 has the read lock
fsn.readUnlock();
} catch (InterruptedException e) {
fail("Interrupted during testing");
}
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
barrier.await(); // Wait until t1 finishes sleeping
fsn.readLock();
barrier2.countDown(); // Allow for t1 to unlock
fsn.readUnlock();
} catch (InterruptedException e) {
fail("Interrupted during testing");
}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
// Look for the differentiating class names in the stack trace
String stackTracePatternString =
String.format("INFO.+%s(.+\n){4}\\Q%%s\\E\\.run", readLockLogStmt);
Pattern t1Pattern = Pattern.compile(
String.format(stackTracePatternString, t1.getClass().getName()));
assertTrue(t1Pattern.matcher(logs.getOutput()).find());
Pattern t2Pattern = Pattern.compile(
String.format(stackTracePatternString, t2.getClass().getName()));
assertFalse(t2Pattern.matcher(logs.getOutput()).find());
assertTrue(logs.getOutput().contains("Number of suppressed read-lock " +
"reports: 2"));
}
@Test
public void testSafemodeReplicationConf() throws IOException {
Configuration conf = new Configuration();

View File

@ -0,0 +1,317 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.util.FakeTimer;
import org.apache.log4j.Level;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import static org.junit.Assert.*;
/**
* Tests the FSNamesystemLock, looking at lock compatibilities and
* proper logging of lock hold times.
*/
public class TestFSNamesystemLock {
@Test
public void testFsLockFairness() throws IOException, InterruptedException{
Configuration conf = new Configuration();
conf.setBoolean("dfs.namenode.fslock.fair", true);
FSNamesystemLock fsnLock = new FSNamesystemLock(conf);
assertTrue(fsnLock.coarseLock.isFair());
conf.setBoolean("dfs.namenode.fslock.fair", false);
fsnLock = new FSNamesystemLock(conf);
assertFalse(fsnLock.coarseLock.isFair());
}
@Test
public void testFSNamesystemLockCompatibility() {
FSNamesystemLock rwLock = new FSNamesystemLock(new Configuration());
assertEquals(0, rwLock.getReadHoldCount());
rwLock.readLock();
assertEquals(1, rwLock.getReadHoldCount());
rwLock.readLock();
assertEquals(2, rwLock.getReadHoldCount());
rwLock.readUnlock();
assertEquals(1, rwLock.getReadHoldCount());
rwLock.readUnlock();
assertEquals(0, rwLock.getReadHoldCount());
assertFalse(rwLock.isWriteLockedByCurrentThread());
assertEquals(0, rwLock.getWriteHoldCount());
rwLock.writeLock();
assertTrue(rwLock.isWriteLockedByCurrentThread());
assertEquals(1, rwLock.getWriteHoldCount());
rwLock.writeLock();
assertTrue(rwLock.isWriteLockedByCurrentThread());
assertEquals(2, rwLock.getWriteHoldCount());
rwLock.writeUnlock();
assertTrue(rwLock.isWriteLockedByCurrentThread());
assertEquals(1, rwLock.getWriteHoldCount());
rwLock.writeUnlock();
assertFalse(rwLock.isWriteLockedByCurrentThread());
assertEquals(0, rwLock.getWriteHoldCount());
}
@Test
public void testFSLockGetWaiterCount() throws InterruptedException {
final int threadCount = 3;
final CountDownLatch latch = new CountDownLatch(threadCount);
final Configuration conf = new Configuration();
conf.setBoolean("dfs.namenode.fslock.fair", true);
final FSNamesystemLock rwLock = new FSNamesystemLock(conf);
rwLock.writeLock();
ExecutorService helper = Executors.newFixedThreadPool(threadCount);
for (int x = 0; x < threadCount; x++) {
helper.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
rwLock.readLock();
}
});
}
latch.await();
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (threadCount == rwLock.getQueueLength());
}
}, 10, 1000);
} catch (TimeoutException e) {
fail("Expected number of blocked thread not found");
}
}
/**
* Test when FSNamesystem write lock is held for a long time,
* logger will report it.
*/
@Test(timeout=45000)
public void testFSWriteLockLongHoldingReport() throws Exception {
final long writeLockReportingThreshold = 100L;
final long writeLockSuppressWarningInterval = 10000L;
Configuration conf = new Configuration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
writeLockReportingThreshold);
conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
final FakeTimer timer = new FakeTimer();
final FSNamesystemLock fsnLock = new FSNamesystemLock(conf, timer);
timer.advance(writeLockSuppressWarningInterval);
LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
// Don't report if the write lock is held for a short time
fsnLock.writeLock();
fsnLock.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Report if the write lock is held for a long time
fsnLock.writeLock();
timer.advance(writeLockReportingThreshold + 10);
logs.clearOutput();
fsnLock.writeUnlock();
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Track but do not report if the write lock is held (interruptibly) for
// a long time but time since last report does not exceed the suppress
// warning interval
fsnLock.writeLockInterruptibly();
timer.advance(writeLockReportingThreshold + 10);
logs.clearOutput();
fsnLock.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Track but do not report if it's held for a long time when re-entering
// write lock but time since last report does not exceed the suppress
// warning interval
fsnLock.writeLock();
timer.advance(writeLockReportingThreshold / 2 + 1);
fsnLock.writeLockInterruptibly();
timer.advance(writeLockReportingThreshold / 2 + 1);
fsnLock.writeLock();
timer.advance(writeLockReportingThreshold / 2);
logs.clearOutput();
fsnLock.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
logs.clearOutput();
fsnLock.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
logs.clearOutput();
fsnLock.writeUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
// Report if it's held for a long time and time since last report exceeds
// the supress warning interval
timer.advance(writeLockSuppressWarningInterval);
fsnLock.writeLock();
timer.advance(writeLockReportingThreshold + 100);
logs.clearOutput();
fsnLock.writeUnlock();
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
assertTrue(logs.getOutput().contains(
"Number of suppressed write-lock reports: 2"));
}
/**
* Test when FSNamesystem read lock is held for a long time,
* logger will report it.
*/
@Test(timeout=45000)
public void testFSReadLockLongHoldingReport() throws Exception {
final long readLockReportingThreshold = 100L;
final long readLockSuppressWarningInterval = 10000L;
final String readLockLogStmt = "FSNamesystem read lock held for ";
Configuration conf = new Configuration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
readLockReportingThreshold);
conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
readLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
final FakeTimer timer = new FakeTimer();
final FSNamesystemLock fsnLock = new FSNamesystemLock(conf, timer);
timer.advance(readLockSuppressWarningInterval);
LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
// Don't report if the read lock is held for a short time
fsnLock.readLock();
fsnLock.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
logs.getOutput().contains(readLockLogStmt));
// Report the first read lock warning if it is held for a long time
fsnLock.readLock();
timer.advance(readLockReportingThreshold + 10);
logs.clearOutput();
fsnLock.readUnlock();
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
logs.getOutput().contains(readLockLogStmt));
// Track but do not Report if the write lock is held for a long time but
// time since last report does not exceed the suppress warning interval
fsnLock.readLock();
timer.advance(readLockReportingThreshold + 10);
logs.clearOutput();
fsnLock.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
logs.getOutput().contains(readLockLogStmt));
// Track but do not Report if it's held for a long time when re-entering
// read lock but time since last report does not exceed the suppress
// warning interval
fsnLock.readLock();
timer.advance(readLockReportingThreshold / 2 + 1);
fsnLock.readLock();
timer.advance(readLockReportingThreshold / 2 + 1);
logs.clearOutput();
fsnLock.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) ||
logs.getOutput().contains(readLockLogStmt));
logs.clearOutput();
fsnLock.readUnlock();
assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
logs.getOutput().contains(readLockLogStmt));
// Report if it's held for a long time (and time since last report
// exceeds the suppress warning interval) while another thread also has the
// read lock. Let one thread hold the lock long enough to activate an
// alert, then have another thread grab the read lock to ensure that this
// doesn't reset the timing.
timer.advance(readLockSuppressWarningInterval);
logs.clearOutput();
final CountDownLatch barrier = new CountDownLatch(1);
final CountDownLatch barrier2 = new CountDownLatch(1);
Thread t1 = new Thread() {
@Override
public void run() {
try {
fsnLock.readLock();
timer.advance(readLockReportingThreshold + 1);
barrier.countDown(); // Allow for t2 to acquire the read lock
barrier2.await(); // Wait until t2 has the read lock
fsnLock.readUnlock();
} catch (InterruptedException e) {
fail("Interrupted during testing");
}
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
barrier.await(); // Wait until t1 finishes sleeping
fsnLock.readLock();
barrier2.countDown(); // Allow for t1 to unlock
fsnLock.readUnlock();
} catch (InterruptedException e) {
fail("Interrupted during testing");
}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
// Look for the differentiating class names in the stack trace
String stackTracePatternString =
String.format("INFO.+%s(.+\n){4}\\Q%%s\\E\\.run", readLockLogStmt);
Pattern t1Pattern = Pattern.compile(
String.format(stackTracePatternString, t1.getClass().getName()));
assertTrue(t1Pattern.matcher(logs.getOutput()).find());
Pattern t2Pattern = Pattern.compile(
String.format(stackTracePatternString, t2.getClass().getName()));
assertFalse(t2Pattern.matcher(logs.getOutput()).find());
assertTrue(logs.getOutput().contains(
"Number of suppressed read-lock reports: 2"));
}
}