HDFS-7165. Separate block metrics for files with replication count 1. Contributed by Zhe Zhang.

This commit is contained in:
Andrew Wang 2014-10-30 10:21:12 -07:00
parent 3e8544c5f2
commit ca6a21f7d1
15 changed files with 135 additions and 9 deletions

View File

@ -45,6 +45,9 @@ Release 2.7.0 - UNRELEASED
HDFS-6824. Additional user documentation for HDFS encryption. (wang)
HDFS-7165. Separate block metrics for files with replication count 1.
(Zhe Zhang via wang)
HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang)
HDFS-7257. Add the time of last HA state transition to NN's /jmx page.

View File

@ -2363,6 +2363,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return namenode.getStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
}
/**
* Returns count of blocks with replication factor 1 and have
* lost the only replica.
* @throws IOException
*/
public long getMissingReplOneBlocksCount() throws IOException {
return namenode.getStats()[ClientProtocol.
GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX];
}
/**
* Returns count of blocks with one of more replica missing.
* @throws IOException

View File

@ -974,6 +974,16 @@ public class DistributedFileSystem extends FileSystem {
return dfs.getMissingBlocksCount();
}
/**
* Returns count of blocks with replication factor 1 and have
* lost the only replica.
*
* @throws IOException
*/
public long getMissingReplOneBlocksCount() throws IOException {
return dfs.getMissingReplOneBlocksCount();
}
/**
* Returns count of blocks with one of more replica missing.
*

View File

@ -652,6 +652,7 @@ public interface ClientProtocol {
public int GET_STATS_UNDER_REPLICATED_IDX = 3;
public int GET_STATS_CORRUPT_BLOCKS_IDX = 4;
public int GET_STATS_MISSING_BLOCKS_IDX = 5;
public int GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX = 6;
/**
* Get a set of statistics about the filesystem.
@ -663,7 +664,8 @@ public interface ClientProtocol {
* <li> [3] contains number of under replicated blocks in the system.</li>
* <li> [4] contains number of blocks with a corrupt replica. </li>
* <li> [5] contains number of blocks without any good replicas left. </li>
* <li> [6] contains the total used space of the block pool. </li>
* <li> [6] contains number of blocks which have replication factor
* 1 and have lost the only replica. </li>
* </ul>
* Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of
* actual numbers to index into the array.

View File

@ -1548,13 +1548,15 @@ public class PBHelper {
}
public static long[] convert(GetFsStatsResponseProto res) {
long[] result = new long[6];
long[] result = new long[7];
result[ClientProtocol.GET_STATS_CAPACITY_IDX] = res.getCapacity();
result[ClientProtocol.GET_STATS_USED_IDX] = res.getUsed();
result[ClientProtocol.GET_STATS_REMAINING_IDX] = res.getRemaining();
result[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = res.getUnderReplicated();
result[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = res.getCorruptBlocks();
result[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = res.getMissingBlocks();
result[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
res.getMissingReplOneBlocks();
return result;
}
@ -1576,6 +1578,9 @@ public class PBHelper {
if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX + 1)
result.setMissingBlocks(
fsStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]);
if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX + 1)
result.setMissingReplOneBlocks(
fsStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]);
return result.build();
}

View File

@ -3443,6 +3443,11 @@ public class BlockManager {
return this.neededReplications.getCorruptBlockSize();
}
public long getMissingReplOneBlocksCount() {
// not locking
return this.neededReplications.getCorruptReplOneBlockSize();
}
public BlockInfo addBlockCollection(BlockInfo block, BlockCollection bc) {
return blocksMap.addBlockCollection(block, bc);
}

View File

@ -179,7 +179,7 @@ class HeartbeatManager implements DatanodeStatistics {
-1L,
-1L,
-1L,
getBlockPoolUsed()};
-1L};
}
@Override

View File

@ -87,7 +87,9 @@ class UnderReplicatedBlocks implements Iterable<Block> {
/** Stores the replication index for each priority */
private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
/** The number of corrupt blocks with replication factor 1 */
private int corruptReplOneBlocks = 0;
/** Create an object. */
UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) {
@ -130,6 +132,11 @@ class UnderReplicatedBlocks implements Iterable<Block> {
return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
}
/** Return the number of corrupt blocks with replication factor 1 */
synchronized int getCorruptReplOneBlockSize() {
return corruptReplOneBlocks;
}
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
for(LightWeightLinkedSet<Block> set : priorityQueues) {
@ -191,6 +198,10 @@ class UnderReplicatedBlocks implements Iterable<Block> {
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
expectedReplicas == 1) {
corruptReplOneBlocks++;
}
if(NameNode.blockStateChangeLog.isDebugEnabled()) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.add:"
@ -213,7 +224,16 @@ class UnderReplicatedBlocks implements Iterable<Block> {
int priLevel = getPriority(block, oldReplicas,
decommissionedReplicas,
oldExpectedReplicas);
return remove(block, priLevel);
boolean removedBlock = remove(block, priLevel);
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
oldExpectedReplicas == 1 &&
removedBlock) {
corruptReplOneBlocks--;
assert corruptReplOneBlocks >= 0 :
"Number of corrupt blocks with replication factor 1 " +
"should be non-negative";
}
return removedBlock;
}
/**
@ -307,6 +327,18 @@ class UnderReplicatedBlocks implements Iterable<Block> {
+ " at priority level " + curPri);
}
}
if (oldPri != curPri || expectedReplicasDelta != 0) {
// corruptReplOneBlocks could possibly change
if (curPri == QUEUE_WITH_CORRUPT_BLOCKS &&
curExpectedReplicas == 1) {
// add a new corrupt block with replication factor 1
corruptReplOneBlocks++;
} else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS &&
curExpectedReplicas - expectedReplicasDelta == 1) {
// remove an existing corrupt block with replication factor 1
corruptReplOneBlocks--;
}
}
}
/**

View File

@ -5310,6 +5310,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// not locking
return blockManager.getMissingBlocksCount();
}
@Metric({"MissingReplOneBlocks", "Number of missing blocks " +
"with replication factor 1"})
public long getMissingReplOneBlocksCount() {
// not locking
return blockManager.getMissingReplOneBlocksCount();
}
@Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
public int getExpiredHeartbeats() {
@ -5351,6 +5358,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
stats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
getMissingReplOneBlocksCount();
return stats;
}
@ -7617,6 +7626,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return getMissingBlocksCount();
}
@Override // NameNodeMXBean
public long getNumberOfMissingBlocksWithReplicationFactorOne() {
return getMissingReplOneBlocksCount();
}
@Override // NameNodeMXBean
public int getThreads() {
return ManagementFactory.getThreadMXBean().getThreadCount();

View File

@ -147,10 +147,19 @@ public interface NameNodeMXBean {
/**
* Gets the total number of missing blocks on the cluster
*
* @return the total number of files and blocks on the cluster
* @return the total number of missing blocks on the cluster
*/
public long getNumberOfMissingBlocks();
/**
* Gets the total number of missing blocks on the cluster with
* replication factor 1
*
* @return the total number of missing blocks on the cluster with
* replication factor 1
*/
public long getNumberOfMissingBlocksWithReplicationFactorOne();
/**
* Gets the number of threads.
*

View File

@ -462,6 +462,8 @@ public class DFSAdmin extends FsShell {
dfs.getCorruptBlocksCount());
System.out.println("Missing blocks: " +
dfs.getMissingBlocksCount());
System.out.println("Missing blocks (with replication factor 1): " +
dfs.getMissingReplOneBlocksCount());
System.out.println();

View File

@ -283,6 +283,7 @@ message GetFsStatsResponseProto {
required uint64 under_replicated = 4;
required uint64 corrupt_blocks = 5;
required uint64 missing_blocks = 6;
optional uint64 missing_repl_one_blocks = 7;
}
enum DatanodeReportTypeProto { // type of the datanode report

View File

@ -77,7 +77,6 @@ public class TestMissingBlocksAlert {
Path corruptFile = new Path("/testMissingBlocks/corruptFile");
DFSTestUtil.createFile(dfs, corruptFile, fileLen, (short)3, 0);
// Corrupt the block
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, corruptFile);
assertTrue(TestDatanodeBlockScanner.corruptReplica(block, 0));
@ -120,6 +119,24 @@ public class TestMissingBlocksAlert {
Assert.assertEquals(0, (long)(Long) mbs.getAttribute(mxbeanName,
"NumberOfMissingBlocks"));
Path replOneFile = new Path("/testMissingBlocks/replOneFile");
DFSTestUtil.createFile(dfs, replOneFile, fileLen, (short)1, 0);
ExtendedBlock replOneBlock = DFSTestUtil.getFirstBlock(
dfs, replOneFile);
assertTrue(TestDatanodeBlockScanner.corruptReplica(
replOneBlock, 0));
// read the file so that the corrupt block is reported to NN
in = dfs.open(replOneFile);
try {
in.readFully(new byte[fileLen]);
} catch (ChecksumException ignored) { // checksum error is expected.
}
in.close();
assertEquals(1, dfs.getMissingReplOneBlocksCount());
Assert.assertEquals(1, (long)(Long) mbs.getAttribute(mxbeanName,
"NumberOfMissingBlocksWithReplicationFactorOne"));
} finally {
if (cluster != null) {
cluster.shutdown();

View File

@ -19,10 +19,14 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.junit.Assert;
import org.junit.Test;
public class TestUnderReplicatedBlockQueues extends Assert {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class TestUnderReplicatedBlockQueues {
/**
* Test that adding blocks with different replication counts puts them
@ -36,6 +40,7 @@ public class TestUnderReplicatedBlockQueues extends Assert {
Block block2 = new Block(2);
Block block_very_under_replicated = new Block(3);
Block block_corrupt = new Block(4);
Block block_corrupt_repl_one = new Block(5);
//add a block with a single entry
assertAdded(queues, block1, 1, 0, 3);
@ -64,6 +69,16 @@ public class TestUnderReplicatedBlockQueues extends Assert {
assertInLevel(queues, block_very_under_replicated,
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
//insert a corrupt block with replication factor 1
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
assertEquals(2, queues.getCorruptBlockSize());
assertEquals(1, queues.getCorruptReplOneBlockSize());
queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2);
assertEquals(0, queues.getCorruptReplOneBlockSize());
queues.update(block_corrupt, 0, 0, 1, 0, -2);
assertEquals(1, queues.getCorruptReplOneBlockSize());
queues.update(block_very_under_replicated, 0, 0, 1, -4, -24);
assertEquals(2, queues.getCorruptReplOneBlockSize());
}
private void assertAdded(UnderReplicatedBlocks queues,

View File

@ -289,6 +289,7 @@ public class TestNameNodeMetrics {
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("UnderReplicatedBlocks", 1L, rb);
assertGauge("MissingBlocks", 1L, rb);
assertGauge("MissingReplOneBlocks", 1L, rb);
fs.delete(file, true);
waitForDnMetricValue(NS_METRICS, "UnderReplicatedBlocks", 0L);
}