From dfca0f4abbaa9b1c0bd11491594efa3fc13a73b0 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Fri, 30 Sep 2016 13:15:59 -0700 Subject: [PATCH] HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock. Contributed by Erik Krogen. (cherry-picked from commit 434c5ea75dc3d87513e49290ac9999148ff5163c) --- .../hdfs/server/namenode/FSNamesystem.java | 147 +------- .../server/namenode/FSNamesystemLock.java | 187 ++++++++++- .../server/namenode/TestFSNamesystem.java | 292 ---------------- .../server/namenode/TestFSNamesystemLock.java | 317 ++++++++++++++++++ 4 files changed, 496 insertions(+), 447 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemLock.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index be4d3972ab5..7feba2bd11c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -71,12 +71,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT; @@ -132,8 +126,6 @@ import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -288,7 +280,6 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.Appender; import org.apache.log4j.AsyncAppender; @@ -700,12 +691,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, LOG.info("Enabling async auditlog"); enableAsyncAuditLog(); } - boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true); - LOG.info("fsLock is fair:" + fair); - fsLock = new FSNamesystemLock(fair); - cond = fsLock.writeLock().newCondition(); + fsLock = new FSNamesystemLock(conf); + cond = fsLock.newWriteLockCondition(); cpLock = new ReentrantLock(); - setTimer(new Timer()); this.fsImage = fsImage; try { @@ -816,17 +804,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_KEY, DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_DEFAULT); - this.writeLockReportingThreshold = conf.getLong( - DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, - DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT); - this.readLockReportingThreshold = conf.getLong( - DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, - DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT); - - this.lockSuppressWarningInterval = conf.getTimeDuration( - DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, - DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - // For testing purposes, allow the DT secret manager to be started regardless // of whether security is enabled. alwaysUseDelegationTokensForTests = conf.getBoolean( @@ -1505,131 +1482,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return Util.stringCollectionAsURIs(dirNames); } - private final long lockSuppressWarningInterval; - /** Threshold (ms) for long holding write lock report. */ - private final long writeLockReportingThreshold; - private int numWriteLockWarningsSuppressed = 0; - private long timeStampOfLastWriteLockReport = 0; - private long longestWriteLockHeldInterval = 0; - /** Last time stamp for write lock. Keep the longest one for multi-entrance.*/ - private long writeLockHeldTimeStamp; - /** Threshold (ms) for long holding read lock report. */ - private long readLockReportingThreshold; - private AtomicInteger numReadLockWarningsSuppressed = new AtomicInteger(0); - private AtomicLong timeStampOfLastReadLockReport = new AtomicLong(0); - private AtomicLong longestReadLockHeldInterval = new AtomicLong(0); - private Timer timer; - /** - * Last time stamp for read lock. Keep the longest one for - * multi-entrance. This is ThreadLocal since there could be - * many read locks held simultaneously. - */ - private static ThreadLocal readLockHeldTimeStamp = - new ThreadLocal() { - @Override - public Long initialValue() { - return Long.MAX_VALUE; - } - }; - @Override public void readLock() { - this.fsLock.readLock().lock(); - if (this.fsLock.getReadHoldCount() == 1) { - readLockHeldTimeStamp.set(timer.monotonicNow()); - } + this.fsLock.readLock(); } @Override public void readUnlock() { - final boolean needReport = this.fsLock.getReadHoldCount() == 1; - final long readLockInterval = timer.monotonicNow() - - readLockHeldTimeStamp.get(); - if (needReport) { - readLockHeldTimeStamp.remove(); - } - - this.fsLock.readLock().unlock(); - - if (needReport && readLockInterval >= this.readLockReportingThreshold) { - long localLongestReadLock; - do { - localLongestReadLock = longestReadLockHeldInterval.get(); - } while (localLongestReadLock - readLockInterval < 0 - && !longestReadLockHeldInterval.compareAndSet(localLongestReadLock, - readLockInterval)); - - long localTimeStampOfLastReadLockReport; - long now; - do { - now = timer.monotonicNow(); - localTimeStampOfLastReadLockReport = timeStampOfLastReadLockReport - .get(); - if (now - localTimeStampOfLastReadLockReport < - lockSuppressWarningInterval) { - numReadLockWarningsSuppressed.incrementAndGet(); - return; - } - } while (!timeStampOfLastReadLockReport.compareAndSet( - localTimeStampOfLastReadLockReport, now)); - int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0); - long longestLockHeldInterval = longestReadLockHeldInterval.getAndSet(0); - LOG.info("FSNamesystem read lock held for " + readLockInterval + - " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) + - "\tNumber of suppressed read-lock reports: " + - numSuppressedWarnings + "\n\tLongest read-lock held interval: " + - longestLockHeldInterval); - } + this.fsLock.readUnlock(); } @Override public void writeLock() { - this.fsLock.writeLock().lock(); - if (fsLock.getWriteHoldCount() == 1) { - writeLockHeldTimeStamp = timer.monotonicNow(); - } + this.fsLock.writeLock(); } @Override public void writeLockInterruptibly() throws InterruptedException { - this.fsLock.writeLock().lockInterruptibly(); - if (fsLock.getWriteHoldCount() == 1) { - writeLockHeldTimeStamp = timer.monotonicNow(); - } + this.fsLock.writeLockInterruptibly(); } @Override public void writeUnlock() { - final boolean needReport = fsLock.getWriteHoldCount() == 1 && - fsLock.isWriteLockedByCurrentThread(); - final long currentTime = timer.monotonicNow(); - final long writeLockInterval = currentTime - writeLockHeldTimeStamp; - - boolean logReport = false; - int numSuppressedWarnings = 0; - long longestLockHeldInterval = 0; - if (needReport && writeLockInterval >= this.writeLockReportingThreshold) { - if (writeLockInterval > longestWriteLockHeldInterval) { - longestWriteLockHeldInterval = writeLockInterval; - } - if (currentTime - timeStampOfLastWriteLockReport > this - .lockSuppressWarningInterval) { - logReport = true; - numSuppressedWarnings = numWriteLockWarningsSuppressed; - numWriteLockWarningsSuppressed = 0; - longestLockHeldInterval = longestWriteLockHeldInterval; - longestWriteLockHeldInterval = 0; - timeStampOfLastWriteLockReport = currentTime; - } else { - numWriteLockWarningsSuppressed++; - } - } - - this.fsLock.writeLock().unlock(); - - if (logReport) { - LOG.info("FSNamesystem write lock held for " + writeLockInterval + - " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) + - "\tNumber of suppressed write-lock reports: " + - numSuppressedWarnings + "\n\tLongest write-lock held interval: " + - longestLockHeldInterval); - } + this.fsLock.writeUnlock(); } @Override public boolean hasWriteLock() { @@ -7022,9 +6893,5 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, .size(); } - @VisibleForTesting - void setTimer(Timer newTimer) { - this.timer = newTimer; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index d2397961257..043f5693829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -19,33 +19,186 @@ package org.apache.hadoop.hdfs.server.namenode; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Timer; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY; /** - * Mimics a ReentrantReadWriteLock so more sophisticated locking capabilities - * are possible. + * Mimics a ReentrantReadWriteLock but does not directly implement the interface + * so more sophisticated locking capabilities and logging/metrics are possible. */ -class FSNamesystemLock implements ReadWriteLock { +class FSNamesystemLock { @VisibleForTesting protected ReentrantReadWriteLock coarseLock; - - FSNamesystemLock(boolean fair) { + + private final Timer timer; + + /** + * Log statements about long lock hold times will not be produced more + * frequently than this interval. + */ + private final long lockSuppressWarningInterval; + + /** Threshold (ms) for long holding write lock report. */ + private final long writeLockReportingThreshold; + /** Last time stamp for write lock. Keep the longest one for multi-entrance.*/ + private long writeLockHeldTimeStamp; + private int numWriteLockWarningsSuppressed = 0; + private long timeStampOfLastWriteLockReport = 0; + private long longestWriteLockHeldInterval = 0; + + /** Threshold (ms) for long holding read lock report. */ + private final long readLockReportingThreshold; + /** + * Last time stamp for read lock. Keep the longest one for + * multi-entrance. This is ThreadLocal since there could be + * many read locks held simultaneously. + */ + private final ThreadLocal readLockHeldTimeStamp = + new ThreadLocal() { + @Override + public Long initialValue() { + return Long.MAX_VALUE; + } + }; + private final AtomicInteger numReadLockWarningsSuppressed = + new AtomicInteger(0); + private final AtomicLong timeStampOfLastReadLockReport = new AtomicLong(0); + private final AtomicLong longestReadLockHeldInterval = new AtomicLong(0); + + FSNamesystemLock(Configuration conf) { + this(conf, new Timer()); + } + + @VisibleForTesting + FSNamesystemLock(Configuration conf, Timer timer) { + boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true); + FSNamesystem.LOG.info("fsLock is fair: " + fair); this.coarseLock = new ReentrantReadWriteLock(fair); + this.timer = timer; + + this.writeLockReportingThreshold = conf.getLong( + DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, + DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT); + this.readLockReportingThreshold = conf.getLong( + DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, + DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT); + this.lockSuppressWarningInterval = conf.getTimeDuration( + DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + } + + public void readLock() { + coarseLock.readLock().lock(); + if (coarseLock.getReadHoldCount() == 1) { + readLockHeldTimeStamp.set(timer.monotonicNow()); + } + } + + public void readUnlock() { + final boolean needReport = coarseLock.getReadHoldCount() == 1; + final long readLockInterval = + timer.monotonicNow() - readLockHeldTimeStamp.get(); + coarseLock.readLock().unlock(); + + if (needReport) { + readLockHeldTimeStamp.remove(); + } + if (needReport && readLockInterval >= this.readLockReportingThreshold) { + long localLongestReadLock; + do { + localLongestReadLock = longestReadLockHeldInterval.get(); + } while (localLongestReadLock - readLockInterval < 0 && + !longestReadLockHeldInterval.compareAndSet(localLongestReadLock, + readLockInterval)); + + long localTimeStampOfLastReadLockReport; + long now; + do { + now = timer.monotonicNow(); + localTimeStampOfLastReadLockReport = + timeStampOfLastReadLockReport.get(); + if (now - localTimeStampOfLastReadLockReport < + lockSuppressWarningInterval) { + numReadLockWarningsSuppressed.incrementAndGet(); + return; + } + } while (!timeStampOfLastReadLockReport.compareAndSet( + localTimeStampOfLastReadLockReport, now)); + int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0); + long longestLockHeldInterval = longestReadLockHeldInterval.getAndSet(0); + FSNamesystem.LOG.info("FSNamesystem read lock held for " + + readLockInterval + " ms via\n" + + StringUtils.getStackTrace(Thread.currentThread()) + + "\tNumber of suppressed read-lock reports: " + numSuppressedWarnings + + "\n\tLongest read-lock held interval: " + longestLockHeldInterval); + } } - @Override - public Lock readLock() { - return coarseLock.readLock(); + public void writeLock() { + coarseLock.writeLock().lock(); + if (coarseLock.getWriteHoldCount() == 1) { + writeLockHeldTimeStamp = timer.monotonicNow(); + } } - - @Override - public Lock writeLock() { - return coarseLock.writeLock(); + + public void writeLockInterruptibly() throws InterruptedException { + coarseLock.writeLock().lockInterruptibly(); + if (coarseLock.getWriteHoldCount() == 1) { + writeLockHeldTimeStamp = timer.monotonicNow(); + } + } + + public void writeUnlock() { + final boolean needReport = coarseLock.getWriteHoldCount() == 1 && + coarseLock.isWriteLockedByCurrentThread(); + final long currentTime = timer.monotonicNow(); + final long writeLockInterval = currentTime - writeLockHeldTimeStamp; + + boolean logReport = false; + int numSuppressedWarnings = 0; + long longestLockHeldInterval = 0; + if (needReport && writeLockInterval >= this.writeLockReportingThreshold) { + if (writeLockInterval > longestWriteLockHeldInterval) { + longestWriteLockHeldInterval = writeLockInterval; + } + if (currentTime - timeStampOfLastWriteLockReport > + this.lockSuppressWarningInterval) { + logReport = true; + numSuppressedWarnings = numWriteLockWarningsSuppressed; + numWriteLockWarningsSuppressed = 0; + longestLockHeldInterval = longestWriteLockHeldInterval; + longestWriteLockHeldInterval = 0; + timeStampOfLastWriteLockReport = currentTime; + } else { + numWriteLockWarningsSuppressed++; + } + } + + coarseLock.writeLock().unlock(); + + if (logReport) { + FSNamesystem.LOG.info("FSNamesystem write lock held for " + + writeLockInterval + " ms via\n" + + StringUtils.getStackTrace(Thread.currentThread()) + + "\tNumber of suppressed write-lock reports: " + + numSuppressedWarnings + "\n\tLongest write-lock held interval: " + + longestLockHeldInterval); + } } public int getReadHoldCount() { @@ -60,6 +213,10 @@ class FSNamesystemLock implements ReadWriteLock { return coarseLock.isWriteLockedByCurrentThread(); } + public Condition newWriteLockCondition() { + return coarseLock.writeLock().newCondition(); + } + /** * Returns the QueueLength of waiting threads. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java index d258196339e..f02c679f388 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; -import org.apache.hadoop.util.FakeTimer; import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.*; @@ -31,7 +30,6 @@ import java.net.InetAddress; import java.net.URI; import java.util.Collection; -import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; @@ -45,22 +43,12 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAState; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.GenericTestUtils.LogCapturer; -import org.apache.log4j.Level; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.regex.Pattern; public class TestFSNamesystem { @@ -165,59 +153,6 @@ public class TestFSNamesystem { assertTrue("Replication queues weren't being populated after entering " + "safemode 2nd time", bm.isPopulatingReplQueues()); } - - @Test - public void testFsLockFairness() throws IOException, InterruptedException{ - Configuration conf = new Configuration(); - - FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); - FSImage fsImage = Mockito.mock(FSImage.class); - Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); - - conf.setBoolean("dfs.namenode.fslock.fair", true); - FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage); - assertTrue(fsNamesystem.getFsLockForTests().isFair()); - - conf.setBoolean("dfs.namenode.fslock.fair", false); - fsNamesystem = new FSNamesystem(conf, fsImage); - assertFalse(fsNamesystem.getFsLockForTests().isFair()); - } - - @Test - public void testFSNamesystemLockCompatibility() { - FSNamesystemLock rwLock = new FSNamesystemLock(true); - - assertEquals(0, rwLock.getReadHoldCount()); - rwLock.readLock().lock(); - assertEquals(1, rwLock.getReadHoldCount()); - - rwLock.readLock().lock(); - assertEquals(2, rwLock.getReadHoldCount()); - - rwLock.readLock().unlock(); - assertEquals(1, rwLock.getReadHoldCount()); - - rwLock.readLock().unlock(); - assertEquals(0, rwLock.getReadHoldCount()); - - assertFalse(rwLock.isWriteLockedByCurrentThread()); - assertEquals(0, rwLock.getWriteHoldCount()); - rwLock.writeLock().lock(); - assertTrue(rwLock.isWriteLockedByCurrentThread()); - assertEquals(1, rwLock.getWriteHoldCount()); - - rwLock.writeLock().lock(); - assertTrue(rwLock.isWriteLockedByCurrentThread()); - assertEquals(2, rwLock.getWriteHoldCount()); - - rwLock.writeLock().unlock(); - assertTrue(rwLock.isWriteLockedByCurrentThread()); - assertEquals(1, rwLock.getWriteHoldCount()); - - rwLock.writeLock().unlock(); - assertFalse(rwLock.isWriteLockedByCurrentThread()); - assertEquals(0, rwLock.getWriteHoldCount()); - } @Test public void testReset() throws Exception { @@ -257,233 +192,6 @@ public class TestFSNamesystem { FSNamesystem.getEffectiveLayoutVersion(false, -63, -61, -63)); } - @Test - public void testFSLockGetWaiterCount() throws InterruptedException { - final int threadCount = 3; - final CountDownLatch latch = new CountDownLatch(threadCount); - final FSNamesystemLock rwLock = new FSNamesystemLock(true); - rwLock.writeLock().lock(); - ExecutorService helper = Executors.newFixedThreadPool(threadCount); - - for (int x = 0; x < threadCount; x++) { - helper.execute(new Runnable() { - @Override - public void run() { - latch.countDown(); - rwLock.readLock().lock(); - } - }); - } - - latch.await(); - try { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - return (threadCount == rwLock.getQueueLength()); - } - }, 10, 1000); - } catch (TimeoutException e) { - fail("Expected number of blocked thread not found"); - } - } - - /** - * Test when FSNamesystem write lock is held for a long time, - * logger will report it. - */ - @Test(timeout=45000) - public void testFSWriteLockLongHoldingReport() throws Exception { - final long writeLockReportingThreshold = 100L; - final long writeLockSuppressWarningInterval = 10000L; - Configuration conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, - writeLockReportingThreshold); - conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, - writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS); - FSImage fsImage = Mockito.mock(FSImage.class); - FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); - Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); - final FSNamesystem fsn = new FSNamesystem(conf, fsImage); - - FakeTimer timer = new FakeTimer(); - fsn.setTimer(timer); - timer.advance(writeLockSuppressWarningInterval); - - LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG); - GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO); - - // Don't report if the write lock is held for a short time - fsn.writeLock(); - fsn.writeUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); - - // Report the first write lock warning if it is held for a long time - fsn.writeLock(); - timer.advance(writeLockReportingThreshold + 10); - logs.clearOutput(); - fsn.writeUnlock(); - assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())); - - // Track but do not Report if the write lock is held (interruptibly) for - // a long time but time since last report does not exceed the suppress - // warning interval - fsn.writeLockInterruptibly(); - timer.advance(writeLockReportingThreshold + 10); - logs.clearOutput(); - fsn.writeUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); - - // Track but do not Report if it's held for a long time when re-entering - // write lock but time since last report does not exceed the suppress - // warning interval - fsn.writeLock(); - timer.advance(writeLockReportingThreshold/ 2 + 1); - fsn.writeLockInterruptibly(); - timer.advance(writeLockReportingThreshold/ 2 + 1); - fsn.writeLock(); - timer.advance(writeLockReportingThreshold/ 2); - logs.clearOutput(); - fsn.writeUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); - logs.clearOutput(); - fsn.writeUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); - logs.clearOutput(); - fsn.writeUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); - - // Report if it's held for a long time and time since last report exceeds - // the supress warning interval - timer.advance(writeLockSuppressWarningInterval); - fsn.writeLock(); - timer.advance(writeLockReportingThreshold + 100); - logs.clearOutput(); - fsn.writeUnlock(); - assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())); - assertTrue(logs.getOutput().contains("Number of suppressed write-lock " + - "reports: 2")); - } - - /** - * Test when FSNamesystem read lock is held for a long time, - * logger will report it. - */ - @Test(timeout=45000) - public void testFSReadLockLongHoldingReport() throws Exception { - final long readLockReportingThreshold = 100L; - final long readLockSuppressWarningInterval = 10000L; - final String readLockLogStmt = "FSNamesystem read lock held for "; - Configuration conf = new Configuration(); - conf.setLong( - DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, - readLockReportingThreshold); - conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, - readLockSuppressWarningInterval, TimeUnit.MILLISECONDS); - FSImage fsImage = Mockito.mock(FSImage.class); - FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); - Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); - final FSNamesystem fsn = new FSNamesystem(conf, fsImage); - - final FakeTimer timer = new FakeTimer(); - fsn.setTimer(timer); - timer.advance(readLockSuppressWarningInterval); - - LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG); - GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO); - - // Don't report if the read lock is held for a short time - fsn.readLock(); - fsn.readUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) && - logs.getOutput().contains(readLockLogStmt)); - - // Report the first read lock warning if it is held for a long time - fsn.readLock(); - timer.advance(readLockReportingThreshold + 10); - logs.clearOutput(); - fsn.readUnlock(); - assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) - && logs.getOutput().contains(readLockLogStmt)); - - // Track but do not Report if the write lock is held for a long time but - // time since last report does not exceed the suppress warning interval - fsn.readLock(); - timer.advance(readLockReportingThreshold + 10); - logs.clearOutput(); - fsn.readUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) - && logs.getOutput().contains(readLockLogStmt)); - - // Track but do not Report if it's held for a long time when re-entering - // read lock but time since last report does not exceed the suppress - // warning interval - fsn.readLock(); - timer.advance(readLockReportingThreshold / 2 + 1); - fsn.readLock(); - timer.advance(readLockReportingThreshold / 2 + 1); - logs.clearOutput(); - fsn.readUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) || - logs.getOutput().contains(readLockLogStmt)); - logs.clearOutput(); - fsn.readUnlock(); - assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) && - logs.getOutput().contains(readLockLogStmt)); - - // Report if it's held for a long time (and time since last report - // exceeds the suppress warning interval) while another thread also has the - // read lock. Let one thread hold the lock long enough to activate an - // alert, then have another thread grab the read lock to ensure that this - // doesn't reset the timing. - timer.advance(readLockSuppressWarningInterval); - logs.clearOutput(); - final CountDownLatch barrier = new CountDownLatch(1); - final CountDownLatch barrier2 = new CountDownLatch(1); - Thread t1 = new Thread() { - @Override - public void run() { - try { - fsn.readLock(); - timer.advance(readLockReportingThreshold + 1); - barrier.countDown(); // Allow for t2 to acquire the read lock - barrier2.await(); // Wait until t2 has the read lock - fsn.readUnlock(); - } catch (InterruptedException e) { - fail("Interrupted during testing"); - } - } - }; - Thread t2 = new Thread() { - @Override - public void run() { - try { - barrier.await(); // Wait until t1 finishes sleeping - fsn.readLock(); - barrier2.countDown(); // Allow for t1 to unlock - fsn.readUnlock(); - } catch (InterruptedException e) { - fail("Interrupted during testing"); - } - } - }; - t1.start(); - t2.start(); - t1.join(); - t2.join(); - // Look for the differentiating class names in the stack trace - String stackTracePatternString = - String.format("INFO.+%s(.+\n){4}\\Q%%s\\E\\.run", readLockLogStmt); - Pattern t1Pattern = Pattern.compile( - String.format(stackTracePatternString, t1.getClass().getName())); - assertTrue(t1Pattern.matcher(logs.getOutput()).find()); - Pattern t2Pattern = Pattern.compile( - String.format(stackTracePatternString, t2.getClass().getName())); - assertFalse(t2Pattern.matcher(logs.getOutput()).find()); - assertTrue(logs.getOutput().contains("Number of suppressed read-lock " + - "reports: 2")); - } - @Test public void testSafemodeReplicationConf() throws IOException { Configuration conf = new Configuration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemLock.java new file mode 100644 index 00000000000..08900ecba8f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemLock.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.base.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.apache.hadoop.util.FakeTimer; +import org.apache.log4j.Level; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; + +import static org.junit.Assert.*; + +/** + * Tests the FSNamesystemLock, looking at lock compatibilities and + * proper logging of lock hold times. + */ +public class TestFSNamesystemLock { + + @Test + public void testFsLockFairness() throws IOException, InterruptedException{ + Configuration conf = new Configuration(); + + conf.setBoolean("dfs.namenode.fslock.fair", true); + FSNamesystemLock fsnLock = new FSNamesystemLock(conf); + assertTrue(fsnLock.coarseLock.isFair()); + + conf.setBoolean("dfs.namenode.fslock.fair", false); + fsnLock = new FSNamesystemLock(conf); + assertFalse(fsnLock.coarseLock.isFair()); + } + + @Test + public void testFSNamesystemLockCompatibility() { + FSNamesystemLock rwLock = new FSNamesystemLock(new Configuration()); + + assertEquals(0, rwLock.getReadHoldCount()); + rwLock.readLock(); + assertEquals(1, rwLock.getReadHoldCount()); + + rwLock.readLock(); + assertEquals(2, rwLock.getReadHoldCount()); + + rwLock.readUnlock(); + assertEquals(1, rwLock.getReadHoldCount()); + + rwLock.readUnlock(); + assertEquals(0, rwLock.getReadHoldCount()); + + assertFalse(rwLock.isWriteLockedByCurrentThread()); + assertEquals(0, rwLock.getWriteHoldCount()); + rwLock.writeLock(); + assertTrue(rwLock.isWriteLockedByCurrentThread()); + assertEquals(1, rwLock.getWriteHoldCount()); + + rwLock.writeLock(); + assertTrue(rwLock.isWriteLockedByCurrentThread()); + assertEquals(2, rwLock.getWriteHoldCount()); + + rwLock.writeUnlock(); + assertTrue(rwLock.isWriteLockedByCurrentThread()); + assertEquals(1, rwLock.getWriteHoldCount()); + + rwLock.writeUnlock(); + assertFalse(rwLock.isWriteLockedByCurrentThread()); + assertEquals(0, rwLock.getWriteHoldCount()); + } + + @Test + public void testFSLockGetWaiterCount() throws InterruptedException { + final int threadCount = 3; + final CountDownLatch latch = new CountDownLatch(threadCount); + final Configuration conf = new Configuration(); + conf.setBoolean("dfs.namenode.fslock.fair", true); + final FSNamesystemLock rwLock = new FSNamesystemLock(conf); + rwLock.writeLock(); + ExecutorService helper = Executors.newFixedThreadPool(threadCount); + + for (int x = 0; x < threadCount; x++) { + helper.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + rwLock.readLock(); + } + }); + } + + latch.await(); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return (threadCount == rwLock.getQueueLength()); + } + }, 10, 1000); + } catch (TimeoutException e) { + fail("Expected number of blocked thread not found"); + } + } + + /** + * Test when FSNamesystem write lock is held for a long time, + * logger will report it. + */ + @Test(timeout=45000) + public void testFSWriteLockLongHoldingReport() throws Exception { + final long writeLockReportingThreshold = 100L; + final long writeLockSuppressWarningInterval = 10000L; + Configuration conf = new Configuration(); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, + writeLockReportingThreshold); + conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS); + + final FakeTimer timer = new FakeTimer(); + final FSNamesystemLock fsnLock = new FSNamesystemLock(conf, timer); + timer.advance(writeLockSuppressWarningInterval); + + LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG); + GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO); + + // Don't report if the write lock is held for a short time + fsnLock.writeLock(); + fsnLock.writeUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); + + // Report if the write lock is held for a long time + fsnLock.writeLock(); + timer.advance(writeLockReportingThreshold + 10); + logs.clearOutput(); + fsnLock.writeUnlock(); + assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())); + + // Track but do not report if the write lock is held (interruptibly) for + // a long time but time since last report does not exceed the suppress + // warning interval + fsnLock.writeLockInterruptibly(); + timer.advance(writeLockReportingThreshold + 10); + logs.clearOutput(); + fsnLock.writeUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); + + // Track but do not report if it's held for a long time when re-entering + // write lock but time since last report does not exceed the suppress + // warning interval + fsnLock.writeLock(); + timer.advance(writeLockReportingThreshold / 2 + 1); + fsnLock.writeLockInterruptibly(); + timer.advance(writeLockReportingThreshold / 2 + 1); + fsnLock.writeLock(); + timer.advance(writeLockReportingThreshold / 2); + logs.clearOutput(); + fsnLock.writeUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); + logs.clearOutput(); + fsnLock.writeUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); + logs.clearOutput(); + fsnLock.writeUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())); + + // Report if it's held for a long time and time since last report exceeds + // the supress warning interval + timer.advance(writeLockSuppressWarningInterval); + fsnLock.writeLock(); + timer.advance(writeLockReportingThreshold + 100); + logs.clearOutput(); + fsnLock.writeUnlock(); + assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())); + assertTrue(logs.getOutput().contains( + "Number of suppressed write-lock reports: 2")); + } + + /** + * Test when FSNamesystem read lock is held for a long time, + * logger will report it. + */ + @Test(timeout=45000) + public void testFSReadLockLongHoldingReport() throws Exception { + final long readLockReportingThreshold = 100L; + final long readLockSuppressWarningInterval = 10000L; + final String readLockLogStmt = "FSNamesystem read lock held for "; + Configuration conf = new Configuration(); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, + readLockReportingThreshold); + conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + readLockSuppressWarningInterval, TimeUnit.MILLISECONDS); + + final FakeTimer timer = new FakeTimer(); + final FSNamesystemLock fsnLock = new FSNamesystemLock(conf, timer); + timer.advance(readLockSuppressWarningInterval); + + LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG); + GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO); + + // Don't report if the read lock is held for a short time + fsnLock.readLock(); + fsnLock.readUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) && + logs.getOutput().contains(readLockLogStmt)); + + // Report the first read lock warning if it is held for a long time + fsnLock.readLock(); + timer.advance(readLockReportingThreshold + 10); + logs.clearOutput(); + fsnLock.readUnlock(); + assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) && + logs.getOutput().contains(readLockLogStmt)); + + // Track but do not Report if the write lock is held for a long time but + // time since last report does not exceed the suppress warning interval + fsnLock.readLock(); + timer.advance(readLockReportingThreshold + 10); + logs.clearOutput(); + fsnLock.readUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) && + logs.getOutput().contains(readLockLogStmt)); + + // Track but do not Report if it's held for a long time when re-entering + // read lock but time since last report does not exceed the suppress + // warning interval + fsnLock.readLock(); + timer.advance(readLockReportingThreshold / 2 + 1); + fsnLock.readLock(); + timer.advance(readLockReportingThreshold / 2 + 1); + logs.clearOutput(); + fsnLock.readUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) || + logs.getOutput().contains(readLockLogStmt)); + logs.clearOutput(); + fsnLock.readUnlock(); + assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) && + logs.getOutput().contains(readLockLogStmt)); + + // Report if it's held for a long time (and time since last report + // exceeds the suppress warning interval) while another thread also has the + // read lock. Let one thread hold the lock long enough to activate an + // alert, then have another thread grab the read lock to ensure that this + // doesn't reset the timing. + timer.advance(readLockSuppressWarningInterval); + logs.clearOutput(); + final CountDownLatch barrier = new CountDownLatch(1); + final CountDownLatch barrier2 = new CountDownLatch(1); + Thread t1 = new Thread() { + @Override + public void run() { + try { + fsnLock.readLock(); + timer.advance(readLockReportingThreshold + 1); + barrier.countDown(); // Allow for t2 to acquire the read lock + barrier2.await(); // Wait until t2 has the read lock + fsnLock.readUnlock(); + } catch (InterruptedException e) { + fail("Interrupted during testing"); + } + } + }; + Thread t2 = new Thread() { + @Override + public void run() { + try { + barrier.await(); // Wait until t1 finishes sleeping + fsnLock.readLock(); + barrier2.countDown(); // Allow for t1 to unlock + fsnLock.readUnlock(); + } catch (InterruptedException e) { + fail("Interrupted during testing"); + } + } + }; + t1.start(); + t2.start(); + t1.join(); + t2.join(); + // Look for the differentiating class names in the stack trace + String stackTracePatternString = + String.format("INFO.+%s(.+\n){4}\\Q%%s\\E\\.run", readLockLogStmt); + Pattern t1Pattern = Pattern.compile( + String.format(stackTracePatternString, t1.getClass().getName())); + assertTrue(t1Pattern.matcher(logs.getOutput()).find()); + Pattern t2Pattern = Pattern.compile( + String.format(stackTracePatternString, t2.getClass().getName())); + assertFalse(t2Pattern.matcher(logs.getOutput()).find()); + assertTrue(logs.getOutput().contains( + "Number of suppressed read-lock reports: 2")); + } + +} \ No newline at end of file