HDFS-9425. Expose number of blocks per volume as a metric (Contributed by Brahma Reddy Battula)
(cherry picked from commit342c9572bf
) (cherry picked from commit2e3c35a835
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
This commit is contained in:
parent
8ca8d218a4
commit
426f213088
|
@ -997,6 +997,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9768. Reuse ObjectMapper instance in HDFS to improve the performance.
|
HDFS-9768. Reuse ObjectMapper instance in HDFS to improve the performance.
|
||||||
(Lin Yiqun via aajisaka)
|
(Lin Yiqun via aajisaka)
|
||||||
|
|
||||||
|
HDFS-9425. Expose number of blocks per volume as a metric
|
||||||
|
(Brahma Reddy Battula via vinayakumarb)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-8091: ACLStatus and XAttributes should be presented to
|
HDFS-8091: ACLStatus and XAttributes should be presented to
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.io.RandomAccessFile;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Scanner;
|
import java.util.Scanner;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -85,6 +86,7 @@ class BlockPoolSlice {
|
||||||
private final boolean deleteDuplicateReplicas;
|
private final boolean deleteDuplicateReplicas;
|
||||||
private static final String REPLICA_CACHE_FILE = "replicas";
|
private static final String REPLICA_CACHE_FILE = "replicas";
|
||||||
private final long replicaCacheExpiry = 5*60*1000;
|
private final long replicaCacheExpiry = 5*60*1000;
|
||||||
|
private AtomicLong numOfBlocks = new AtomicLong();
|
||||||
|
|
||||||
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
||||||
private final DU dfsUsage;
|
private final DU dfsUsage;
|
||||||
|
@ -267,7 +269,11 @@ class BlockPoolSlice {
|
||||||
*/
|
*/
|
||||||
File createTmpFile(Block b) throws IOException {
|
File createTmpFile(Block b) throws IOException {
|
||||||
File f = new File(tmpDir, b.getBlockName());
|
File f = new File(tmpDir, b.getBlockName());
|
||||||
return DatanodeUtil.createTmpFile(b, f);
|
File tmpFile = DatanodeUtil.createTmpFile(b, f);
|
||||||
|
// If any exception during creation, its expected that counter will not be
|
||||||
|
// incremented, So no need to decrement
|
||||||
|
incrNumBlocks();
|
||||||
|
return tmpFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -276,7 +282,11 @@ class BlockPoolSlice {
|
||||||
*/
|
*/
|
||||||
File createRbwFile(Block b) throws IOException {
|
File createRbwFile(Block b) throws IOException {
|
||||||
File f = new File(rbwDir, b.getBlockName());
|
File f = new File(rbwDir, b.getBlockName());
|
||||||
return DatanodeUtil.createTmpFile(b, f);
|
File rbwFile = DatanodeUtil.createTmpFile(b, f);
|
||||||
|
// If any exception during creation, its expected that counter will not be
|
||||||
|
// incremented, So no need to decrement
|
||||||
|
incrNumBlocks();
|
||||||
|
return rbwFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
File addBlock(Block b, File f) throws IOException {
|
File addBlock(Block b, File f) throws IOException {
|
||||||
|
@ -487,6 +497,9 @@ class BlockPoolSlice {
|
||||||
} else {
|
} else {
|
||||||
lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
|
lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
|
||||||
}
|
}
|
||||||
|
if (oldReplica == null) {
|
||||||
|
incrNumBlocks();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -819,4 +832,16 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void incrNumBlocks() {
|
||||||
|
numOfBlocks.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
void decrNumBlocks() {
|
||||||
|
numOfBlocks.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getNumOfBlocks() {
|
||||||
|
return numOfBlocks.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -968,6 +968,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
||||||
// Finalize the copied files
|
// Finalize the copied files
|
||||||
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
||||||
|
synchronized (this) {
|
||||||
|
// Increment numBlocks here as this block moved without knowing to BPS
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
||||||
|
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
|
||||||
|
}
|
||||||
|
|
||||||
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
|
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
|
||||||
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
|
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
|
||||||
|
@ -2602,6 +2607,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
final long reservedSpace; // size of space reserved for non-HDFS
|
final long reservedSpace; // size of space reserved for non-HDFS
|
||||||
final long reservedSpaceForReplicas; // size of space reserved RBW or
|
final long reservedSpaceForReplicas; // size of space reserved RBW or
|
||||||
// re-replication
|
// re-replication
|
||||||
|
final long numBlocks;
|
||||||
|
|
||||||
VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
|
VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
|
||||||
this.directory = v.toString();
|
this.directory = v.toString();
|
||||||
|
@ -2609,6 +2615,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
this.freeSpace = freeSpace;
|
this.freeSpace = freeSpace;
|
||||||
this.reservedSpace = v.getReserved();
|
this.reservedSpace = v.getReserved();
|
||||||
this.reservedSpaceForReplicas = v.getReservedForReplicas();
|
this.reservedSpaceForReplicas = v.getReservedForReplicas();
|
||||||
|
this.numBlocks = v.getNumBlocks();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2643,6 +2650,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
innerInfo.put("freeSpace", v.freeSpace);
|
innerInfo.put("freeSpace", v.freeSpace);
|
||||||
innerInfo.put("reservedSpace", v.reservedSpace);
|
innerInfo.put("reservedSpace", v.reservedSpace);
|
||||||
innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
|
innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
|
||||||
|
innerInfo.put("numBlocks", v.numBlocks);
|
||||||
info.put(v.directory, innerInfo);
|
info.put(v.directory, innerInfo);
|
||||||
}
|
}
|
||||||
return info;
|
return info;
|
||||||
|
@ -2773,8 +2781,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
synchronized (FsDatasetImpl.this) {
|
synchronized (FsDatasetImpl.this) {
|
||||||
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
|
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
|
||||||
|
|
||||||
targetVolume.incDfsUsed(bpId,
|
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
|
||||||
savedFiles[0].length() + savedFiles[1].length());
|
+ savedFiles[1].length());
|
||||||
|
|
||||||
// Update metrics (ignore the metadata file size)
|
// Update metrics (ignore the metadata file size)
|
||||||
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
|
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
|
||||||
|
|
|
@ -283,21 +283,35 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
}
|
}
|
||||||
|
|
||||||
void onBlockFileDeletion(String bpid, long value) {
|
void onBlockFileDeletion(String bpid, long value) {
|
||||||
decDfsUsed(bpid, value);
|
decDfsUsedAndNumBlocks(bpid, value, true);
|
||||||
if (isTransientStorage()) {
|
if (isTransientStorage()) {
|
||||||
dataset.releaseLockedMemory(value, true);
|
dataset.releaseLockedMemory(value, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void onMetaFileDeletion(String bpid, long value) {
|
void onMetaFileDeletion(String bpid, long value) {
|
||||||
decDfsUsed(bpid, value);
|
decDfsUsedAndNumBlocks(bpid, value, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decDfsUsed(String bpid, long value) {
|
private void decDfsUsedAndNumBlocks(String bpid, long value,
|
||||||
|
boolean blockFileDeleted) {
|
||||||
synchronized(dataset) {
|
synchronized(dataset) {
|
||||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||||
if (bp != null) {
|
if (bp != null) {
|
||||||
bp.decDfsUsed(value);
|
bp.decDfsUsed(value);
|
||||||
|
if (blockFileDeleted) {
|
||||||
|
bp.decrNumBlocks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void incDfsUsedAndNumBlocks(String bpid, long value) {
|
||||||
|
synchronized (dataset) {
|
||||||
|
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||||
|
if (bp != null) {
|
||||||
|
bp.incDfsUsed(value);
|
||||||
|
bp.incrNumBlocks();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -846,7 +860,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
|
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getNumBlocks() {
|
||||||
|
long numBlocks = 0;
|
||||||
|
for (BlockPoolSlice s : bpSlices.values()) {
|
||||||
|
numBlocks += s.getNumOfBlocks();
|
||||||
|
}
|
||||||
|
return numBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return currentDir.getAbsolutePath();
|
return currentDir.getAbsolutePath();
|
||||||
|
|
|
@ -18,15 +18,23 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import javax.management.MBeanServer;
|
import javax.management.MBeanServer;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class for testing {@link DataNodeMXBean} implementation
|
* Class for testing {@link DataNodeMXBean} implementation
|
||||||
|
@ -78,4 +86,51 @@ public class TestDataNodeMXBean {
|
||||||
private static String replaceDigits(final String s) {
|
private static String replaceDigits(final String s) {
|
||||||
return s.replaceAll("[0-9]+", "_DIGITS_");
|
return s.replaceAll("[0-9]+", "_DIGITS_");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataNodeMXBeanBlockCount() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
assertEquals(datanodes.size(), 1);
|
||||||
|
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName mxbeanName =
|
||||||
|
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
DFSTestUtil.createFile(fs, new Path("/tmp.txt" + i), 1024, (short) 1,
|
||||||
|
1L);
|
||||||
|
}
|
||||||
|
assertEquals("Before restart DN", 5, getTotalNumBlocks(mbs, mxbeanName));
|
||||||
|
cluster.restartDataNode(0);
|
||||||
|
cluster.waitActive();
|
||||||
|
assertEquals("After restart DN", 5, getTotalNumBlocks(mbs, mxbeanName));
|
||||||
|
fs.delete(new Path("/tmp.txt1"), true);
|
||||||
|
// Wait till replica gets deleted on disk.
|
||||||
|
Thread.sleep(5000);
|
||||||
|
assertEquals("After delete one file", 4,
|
||||||
|
getTotalNumBlocks(mbs, mxbeanName));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
int getTotalNumBlocks(MBeanServer mbs, ObjectName mxbeanName)
|
||||||
|
throws Exception {
|
||||||
|
int totalBlocks = 0;
|
||||||
|
String volumeInfo = (String) mbs.getAttribute(mxbeanName, "VolumeInfo");
|
||||||
|
Map<?, ?> m = (Map<?, ?>) JSON.parse(volumeInfo);
|
||||||
|
Collection<Map<String, Long>> values =
|
||||||
|
(Collection<Map<String, Long>>) m.values();
|
||||||
|
for (Map<String, Long> volumeInfoMap : values) {
|
||||||
|
totalBlocks += volumeInfoMap.get("numBlocks");
|
||||||
|
}
|
||||||
|
return totalBlocks;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue