HDFS-8883. NameNode Metrics : Add FSNameSystem lock Queue Length. Contributed by Anu Engineer.
(cherry picked from commita7862d5fe4
) (cherry picked from commit27ccbd51f6
)
This commit is contained in:
parent
a1d8e421f6
commit
e5cb9d9e18
|
@ -230,6 +230,7 @@ Each metrics record contains tags such as HAState and Hostname as additional inf
|
|||
| `BlockCapacity` | Current number of block capacity |
|
||||
| `StaleDataNodes` | Current number of DataNodes marked stale due to delayed heartbeat |
|
||||
| `TotalFiles` | Current number of files and directories (same as FilesTotal) |
|
||||
| `LockQueueLength` | Number of threads waiting to acquire FSNameSystem lock |
|
||||
|
||||
JournalNode
|
||||
-----------
|
||||
|
|
|
@ -4936,6 +4936,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return dir.ezManager.getNumEncryptionZones();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the length of the wait Queue for the FSNameSystemLock.
|
||||
*
|
||||
* A larger number here indicates lots of threads are waiting for
|
||||
* FSNameSystemLock.
|
||||
*
|
||||
* @return int - Number of Threads waiting to acquire FSNameSystemLock
|
||||
*/
|
||||
@Override
|
||||
@Metric({"LockQueueLength", "Number of threads waiting to " +
|
||||
"acquire FSNameSystemLock"})
|
||||
public int getFsLockQueueLength() {
|
||||
return fsLock.getQueueLength();
|
||||
}
|
||||
|
||||
int getNumberOfDatanodes(DatanodeReportType type) {
|
||||
readLock();
|
||||
try {
|
||||
|
|
|
@ -59,4 +59,15 @@ class FSNamesystemLock implements ReadWriteLock {
|
|||
public boolean isWriteLockedByCurrentThread() {
|
||||
return coarseLock.isWriteLockedByCurrentThread();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the QueueLength of waiting threads.
|
||||
*
|
||||
* A larger number indicates greater lock contention.
|
||||
*
|
||||
* @return int - Number of threads waiting on this lock
|
||||
*/
|
||||
public int getQueueLength() {
|
||||
return coarseLock.getQueueLength();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -189,4 +189,13 @@ public interface FSNamesystemMBean {
|
|||
* Return the number of encryption zones in the system.
|
||||
*/
|
||||
int getNumEncryptionZones();
|
||||
|
||||
/**
|
||||
* Returns the length of the wait Queue for the FSNameSystemLock.
|
||||
*
|
||||
* A larger number here indicates lots of threads are waiting for
|
||||
* FSNameSystemLock.
|
||||
* @return int - Number of Threads waiting to acquire FSNameSystemLock
|
||||
*/
|
||||
int getFsLockQueueLength();
|
||||
}
|
||||
|
|
|
@ -37,10 +37,15 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
|||
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class TestFSNamesystem {
|
||||
|
||||
@After
|
||||
|
@ -213,4 +218,27 @@ public class TestFSNamesystem {
|
|||
fsn.imageLoadComplete();
|
||||
assertTrue(fsn.isImageLoaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFSLockGetWaiterCount() throws InterruptedException {
|
||||
final int threadCount = 3;
|
||||
final CountDownLatch latch = new CountDownLatch(threadCount);
|
||||
final FSNamesystemLock rwLock = new FSNamesystemLock(true);
|
||||
rwLock.writeLock().lock();
|
||||
ExecutorService helper = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
for (int x = 0; x < threadCount; x++) {
|
||||
helper.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
rwLock.readLock().lock();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
latch.await();
|
||||
Assert.assertEquals("Expected number of blocked thread not found",
|
||||
threadCount, rwLock.getQueueLength());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
|
@ -172,7 +173,7 @@ public class TestNameNodeMXBean {
|
|||
// This will cause the first dir to fail.
|
||||
File failedNameDir = new File(nameDirUris.iterator().next());
|
||||
assertEquals(0, FileUtil.chmod(
|
||||
new File(failedNameDir, "current").getAbsolutePath(), "000"));
|
||||
new File(failedNameDir, "current").getAbsolutePath(), "000"));
|
||||
cluster.getNameNodeRpc().rollEditLog();
|
||||
|
||||
nameDirStatuses = (String) (mbs.getAttribute(mxbeanName,
|
||||
|
@ -372,4 +373,23 @@ public class TestNameNodeMXBean {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testQueueLength() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
ObjectName mxbeanNameFs =
|
||||
new ObjectName("Hadoop:service=NameNode,name=FSNamesystem");
|
||||
int queueLength = (int) mbs.getAttribute(mxbeanNameFs, "LockQueueLength");
|
||||
assertEquals(0, queueLength);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue