diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d5f2ca735bf..009623606eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -75,6 +75,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPOR import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT; @@ -130,6 +132,8 @@ import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -284,6 +288,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.Appender; import org.apache.log4j.AsyncAppender; @@ -700,6 +705,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, fsLock = new FSNamesystemLock(fair); cond = fsLock.writeLock().newCondition(); cpLock = new ReentrantLock(); + setTimer(new Timer()); this.fsImage = fsImage; try { @@ -817,6 +823,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT); + this.lockSuppressWarningInterval = conf.getTimeDuration( + DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + // For testing purposes, allow the DT secret manager to be started regardless // of whether security is enabled. alwaysUseDelegationTokensForTests = conf.getBoolean( @@ -1495,12 +1505,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return Util.stringCollectionAsURIs(dirNames); } + private final long lockSuppressWarningInterval; /** Threshold (ms) for long holding write lock report. */ - private long writeLockReportingThreshold; + private final long writeLockReportingThreshold; + private int numWriteLockWarningsSuppressed = 0; + private long timeStampOfLastWriteLockReport = 0; + private long longestWriteLockHeldInterval = 0; /** Last time stamp for write lock. Keep the longest one for multi-entrance.*/ private long writeLockHeldTimeStamp; /** Threshold (ms) for long holding read lock report. */ private long readLockReportingThreshold; + private AtomicInteger numReadLockWarningsSuppressed = new AtomicInteger(0); + private AtomicLong timeStampOfLastReadLockReport = new AtomicLong(0); + private AtomicLong longestReadLockHeldInterval = new AtomicLong(0); + private Timer timer; /** * Last time stamp for read lock. Keep the longest one for * multi-entrance. This is ThreadLocal since there could be @@ -1518,48 +1536,99 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, public void readLock() { this.fsLock.readLock().lock(); if (this.fsLock.getReadHoldCount() == 1) { - readLockHeldTimeStamp.set(monotonicNow()); + readLockHeldTimeStamp.set(timer.monotonicNow()); } } @Override public void readUnlock() { final boolean needReport = this.fsLock.getReadHoldCount() == 1; - final long readLockInterval = monotonicNow() - readLockHeldTimeStamp.get(); - this.fsLock.readLock().unlock(); - + final long readLockInterval = timer.monotonicNow() - + readLockHeldTimeStamp.get(); if (needReport) { readLockHeldTimeStamp.remove(); - if (readLockInterval > this.readLockReportingThreshold) { - LOG.info("FSNamesystem read lock held for " + readLockInterval + - " ms via\n" + StringUtils.getStackTrace(Thread.currentThread())); - } + } + + this.fsLock.readLock().unlock(); + + if (needReport && readLockInterval >= this.readLockReportingThreshold) { + long localLongestReadLock; + do { + localLongestReadLock = longestReadLockHeldInterval.get(); + } while (localLongestReadLock - readLockInterval < 0 + && !longestReadLockHeldInterval.compareAndSet(localLongestReadLock, + readLockInterval)); + + long localTimeStampOfLastReadLockReport; + long now; + do { + now = timer.monotonicNow(); + localTimeStampOfLastReadLockReport = timeStampOfLastReadLockReport + .get(); + if (now - localTimeStampOfLastReadLockReport < + lockSuppressWarningInterval) { + numReadLockWarningsSuppressed.incrementAndGet(); + return; + } + } while (!timeStampOfLastReadLockReport.compareAndSet( + localTimeStampOfLastReadLockReport, now)); + int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0); + long longestLockHeldInterval = longestReadLockHeldInterval.getAndSet(0); + LOG.info("FSNamesystem read lock held for " + readLockInterval + + " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) + + "\tNumber of suppressed read-lock reports: " + + numSuppressedWarnings + "\n\tLongest read-lock held interval: " + + longestLockHeldInterval); } } @Override public void writeLock() { this.fsLock.writeLock().lock(); if (fsLock.getWriteHoldCount() == 1) { - writeLockHeldTimeStamp = monotonicNow(); + writeLockHeldTimeStamp = timer.monotonicNow(); } } @Override public void writeLockInterruptibly() throws InterruptedException { this.fsLock.writeLock().lockInterruptibly(); if (fsLock.getWriteHoldCount() == 1) { - writeLockHeldTimeStamp = monotonicNow(); + writeLockHeldTimeStamp = timer.monotonicNow(); } } @Override public void writeUnlock() { final boolean needReport = fsLock.getWriteHoldCount() == 1 && fsLock.isWriteLockedByCurrentThread(); - final long writeLockInterval = monotonicNow() - writeLockHeldTimeStamp; + final long currentTime = timer.monotonicNow(); + final long writeLockInterval = currentTime - writeLockHeldTimeStamp; + + boolean logReport = false; + int numSuppressedWarnings = 0; + long longestLockHeldInterval = 0; + if (needReport && writeLockInterval >= this.writeLockReportingThreshold) { + if (writeLockInterval > longestWriteLockHeldInterval) { + longestWriteLockHeldInterval = writeLockInterval; + } + if (currentTime - timeStampOfLastWriteLockReport > this + .lockSuppressWarningInterval) { + logReport = true; + numSuppressedWarnings = numWriteLockWarningsSuppressed; + numWriteLockWarningsSuppressed = 0; + longestLockHeldInterval = longestWriteLockHeldInterval; + longestWriteLockHeldInterval = 0; + timeStampOfLastWriteLockReport = currentTime; + } else { + numWriteLockWarningsSuppressed++; + } + } this.fsLock.writeLock().unlock(); - if (needReport && writeLockInterval >= this.writeLockReportingThreshold) { + if (logReport) { LOG.info("FSNamesystem write lock held for " + writeLockInterval + - " ms via\n" + StringUtils.getStackTrace(Thread.currentThread())); + " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) + + "\tNumber of suppressed write-lock reports: " + + numSuppressedWarnings + "\n\tLongest write-lock held interval: " + + longestLockHeldInterval); } } @Override @@ -6954,5 +7023,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, .size(); } + @VisibleForTesting + void setTimer(Timer newTimer) { + this.timer = newTimer; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java index 7d898d2545f..d258196339e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; +import org.apache.hadoop.util.FakeTimer; import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.*; @@ -28,7 +29,6 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import com.google.common.base.Supplier; @@ -58,8 +58,8 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.regex.Matcher; import java.util.regex.Pattern; public class TestFSNamesystem { @@ -295,45 +295,54 @@ public class TestFSNamesystem { @Test(timeout=45000) public void testFSWriteLockLongHoldingReport() throws Exception { final long writeLockReportingThreshold = 100L; + final long writeLockSuppressWarningInterval = 10000L; Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, writeLockReportingThreshold); + conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS); FSImage fsImage = Mockito.mock(FSImage.class); FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); final FSNamesystem fsn = new FSNamesystem(conf, fsImage); + FakeTimer timer = new FakeTimer(); + fsn.setTimer(timer); + timer.advance(writeLockSuppressWarningInterval); + LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG); GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO); // Don't report if the write lock is held for a short time fsn.writeLock(); - Thread.sleep(writeLockReportingThreshold / 2); fsn.writeUnlock(); assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); - - // Report if the write lock is held for a long time + // Report the first write lock warning if it is held for a long time fsn.writeLock(); - Thread.sleep(writeLockReportingThreshold + 10); + timer.advance(writeLockReportingThreshold + 10); logs.clearOutput(); fsn.writeUnlock(); assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())); - // Report if the write lock is held (interruptibly) for a long time + // Track but do not Report if the write lock is held (interruptibly) for + // a long time but time since last report does not exceed the suppress + // warning interval fsn.writeLockInterruptibly(); - Thread.sleep(writeLockReportingThreshold + 10); + timer.advance(writeLockReportingThreshold + 10); logs.clearOutput(); fsn.writeUnlock(); - assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); - // Report if it's held for a long time when re-entering write lock + // Track but do not Report if it's held for a long time when re-entering + // write lock but time since last report does not exceed the suppress + // warning interval fsn.writeLock(); - Thread.sleep(writeLockReportingThreshold/ 2 + 1); + timer.advance(writeLockReportingThreshold/ 2 + 1); fsn.writeLockInterruptibly(); - Thread.sleep(writeLockReportingThreshold / 2 + 1); + timer.advance(writeLockReportingThreshold/ 2 + 1); fsn.writeLock(); - Thread.sleep(writeLockReportingThreshold / 2); + timer.advance(writeLockReportingThreshold/ 2); logs.clearOutput(); fsn.writeUnlock(); assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); @@ -342,7 +351,18 @@ public class TestFSNamesystem { assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); logs.clearOutput(); fsn.writeUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); + + // Report if it's held for a long time and time since last report exceeds + // the supress warning interval + timer.advance(writeLockSuppressWarningInterval); + fsn.writeLock(); + timer.advance(writeLockReportingThreshold + 100); + logs.clearOutput(); + fsn.writeUnlock(); assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())); + assertTrue(logs.getOutput().contains("Number of suppressed write-lock " + + "reports: 2")); } /** @@ -352,52 +372,71 @@ public class TestFSNamesystem { @Test(timeout=45000) public void testFSReadLockLongHoldingReport() throws Exception { final long readLockReportingThreshold = 100L; + final long readLockSuppressWarningInterval = 10000L; final String readLockLogStmt = "FSNamesystem read lock held for "; Configuration conf = new Configuration(); conf.setLong( DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, readLockReportingThreshold); + conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + readLockSuppressWarningInterval, TimeUnit.MILLISECONDS); FSImage fsImage = Mockito.mock(FSImage.class); FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); final FSNamesystem fsn = new FSNamesystem(conf, fsImage); + final FakeTimer timer = new FakeTimer(); + fsn.setTimer(timer); + timer.advance(readLockSuppressWarningInterval); + LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG); GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO); // Don't report if the read lock is held for a short time fsn.readLock(); - Thread.sleep(readLockReportingThreshold / 2); fsn.readUnlock(); assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) && logs.getOutput().contains(readLockLogStmt)); - // Report if the read lock is held for a long time + // Report the first read lock warning if it is held for a long time fsn.readLock(); - Thread.sleep(readLockReportingThreshold + 10); + timer.advance(readLockReportingThreshold + 10); logs.clearOutput(); fsn.readUnlock(); assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) && logs.getOutput().contains(readLockLogStmt)); - // Report if it's held for a long time when re-entering read lock + // Track but do not Report if the write lock is held for a long time but + // time since last report does not exceed the suppress warning interval fsn.readLock(); - Thread.sleep(readLockReportingThreshold / 2 + 1); + timer.advance(readLockReportingThreshold + 10); + logs.clearOutput(); + fsn.readUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) + && logs.getOutput().contains(readLockLogStmt)); + + // Track but do not Report if it's held for a long time when re-entering + // read lock but time since last report does not exceed the suppress + // warning interval fsn.readLock(); - Thread.sleep(readLockReportingThreshold / 2 + 1); + timer.advance(readLockReportingThreshold / 2 + 1); + fsn.readLock(); + timer.advance(readLockReportingThreshold / 2 + 1); logs.clearOutput(); fsn.readUnlock(); assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) || logs.getOutput().contains(readLockLogStmt)); logs.clearOutput(); fsn.readUnlock(); - assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) && + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) && logs.getOutput().contains(readLockLogStmt)); - // Report if it's held for a long time while another thread also has the + // Report if it's held for a long time (and time since last report + // exceeds the suppress warning interval) while another thread also has the // read lock. Let one thread hold the lock long enough to activate an // alert, then have another thread grab the read lock to ensure that this // doesn't reset the timing. + timer.advance(readLockSuppressWarningInterval); logs.clearOutput(); final CountDownLatch barrier = new CountDownLatch(1); final CountDownLatch barrier2 = new CountDownLatch(1); @@ -406,7 +445,7 @@ public class TestFSNamesystem { public void run() { try { fsn.readLock(); - Thread.sleep(readLockReportingThreshold + 1); + timer.advance(readLockReportingThreshold + 1); barrier.countDown(); // Allow for t2 to acquire the read lock barrier2.await(); // Wait until t2 has the read lock fsn.readUnlock(); @@ -441,6 +480,8 @@ public class TestFSNamesystem { Pattern t2Pattern = Pattern.compile( String.format(stackTracePatternString, t2.getClass().getName())); assertFalse(t2Pattern.matcher(logs.getOutput()).find()); + assertTrue(logs.getOutput().contains("Number of suppressed read-lock " + + "reports: 2")); } @Test