HDFS-7165. Separate block metrics for files with replication count 1. (Zhe Zhang via wang)
This commit is contained in:
parent
d71d40a63d
commit
8c5b23b547
|
@ -292,6 +292,9 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
HDFS-6824. Additional user documentation for HDFS encryption. (wang)
|
||||
|
||||
HDFS-7165. Separate block metrics for files with replication count 1.
|
||||
(Zhe Zhang via wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -2355,6 +2355,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
return namenode.getStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns count of blocks with replication factor 1 and have
|
||||
* lost the only replica.
|
||||
* @throws IOException
|
||||
*/
|
||||
public long getMissingReplOneBlocksCount() throws IOException {
|
||||
return namenode.getStats()[ClientProtocol.
|
||||
GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns count of blocks with one of more replica missing.
|
||||
* @throws IOException
|
||||
|
|
|
@ -930,6 +930,16 @@ public class DistributedFileSystem extends FileSystem {
|
|||
return dfs.getMissingBlocksCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns count of blocks with replication factor 1 and have
|
||||
* lost the only replica.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public long getMissingReplOneBlocksCount() throws IOException {
|
||||
return dfs.getMissingReplOneBlocksCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns count of blocks with one of more replica missing.
|
||||
*
|
||||
|
|
|
@ -652,6 +652,7 @@ public interface ClientProtocol {
|
|||
public int GET_STATS_UNDER_REPLICATED_IDX = 3;
|
||||
public int GET_STATS_CORRUPT_BLOCKS_IDX = 4;
|
||||
public int GET_STATS_MISSING_BLOCKS_IDX = 5;
|
||||
public int GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX = 6;
|
||||
|
||||
/**
|
||||
* Get a set of statistics about the filesystem.
|
||||
|
@ -663,7 +664,8 @@ public interface ClientProtocol {
|
|||
* <li> [3] contains number of under replicated blocks in the system.</li>
|
||||
* <li> [4] contains number of blocks with a corrupt replica. </li>
|
||||
* <li> [5] contains number of blocks without any good replicas left. </li>
|
||||
* <li> [6] contains the total used space of the block pool. </li>
|
||||
* <li> [6] contains number of blocks which have replication factor
|
||||
* 1 and have lost the only replica. </li>
|
||||
* </ul>
|
||||
* Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of
|
||||
* actual numbers to index into the array.
|
||||
|
|
|
@ -1545,13 +1545,15 @@ public class PBHelper {
|
|||
}
|
||||
|
||||
public static long[] convert(GetFsStatsResponseProto res) {
|
||||
long[] result = new long[6];
|
||||
long[] result = new long[7];
|
||||
result[ClientProtocol.GET_STATS_CAPACITY_IDX] = res.getCapacity();
|
||||
result[ClientProtocol.GET_STATS_USED_IDX] = res.getUsed();
|
||||
result[ClientProtocol.GET_STATS_REMAINING_IDX] = res.getRemaining();
|
||||
result[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = res.getUnderReplicated();
|
||||
result[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = res.getCorruptBlocks();
|
||||
result[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = res.getMissingBlocks();
|
||||
result[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
|
||||
res.getMissingReplOneBlocks();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -1573,6 +1575,9 @@ public class PBHelper {
|
|||
if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX + 1)
|
||||
result.setMissingBlocks(
|
||||
fsStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]);
|
||||
if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX + 1)
|
||||
result.setMissingReplOneBlocks(
|
||||
fsStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]);
|
||||
return result.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -3438,6 +3438,11 @@ public class BlockManager {
|
|||
return this.neededReplications.getCorruptBlockSize();
|
||||
}
|
||||
|
||||
public long getMissingReplOneBlocksCount() {
|
||||
// not locking
|
||||
return this.neededReplications.getCorruptReplOneBlockSize();
|
||||
}
|
||||
|
||||
public BlockInfo addBlockCollection(BlockInfo block, BlockCollection bc) {
|
||||
return blocksMap.addBlockCollection(block, bc);
|
||||
}
|
||||
|
|
|
@ -179,7 +179,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
-1L,
|
||||
-1L,
|
||||
-1L,
|
||||
getBlockPoolUsed()};
|
||||
-1L};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -81,6 +81,9 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
|||
private final List<LightWeightLinkedSet<Block>> priorityQueues
|
||||
= new ArrayList<LightWeightLinkedSet<Block>>(LEVEL);
|
||||
|
||||
/** The number of corrupt blocks with replication factor 1 */
|
||||
private int corruptReplOneBlocks = 0;
|
||||
|
||||
/** Create an object. */
|
||||
UnderReplicatedBlocks() {
|
||||
for (int i = 0; i < LEVEL; i++) {
|
||||
|
@ -122,6 +125,11 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
|||
return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
|
||||
}
|
||||
|
||||
/** Return the number of corrupt blocks with replication factor 1 */
|
||||
synchronized int getCorruptReplOneBlockSize() {
|
||||
return corruptReplOneBlocks;
|
||||
}
|
||||
|
||||
/** Check if a block is in the neededReplication queue */
|
||||
synchronized boolean contains(Block block) {
|
||||
for(LightWeightLinkedSet<Block> set : priorityQueues) {
|
||||
|
@ -183,6 +191,10 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
|||
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
|
||||
expectedReplicas);
|
||||
if(priorityQueues.get(priLevel).add(block)) {
|
||||
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
|
||||
expectedReplicas == 1) {
|
||||
corruptReplOneBlocks++;
|
||||
}
|
||||
if(NameNode.blockStateChangeLog.isDebugEnabled()) {
|
||||
NameNode.blockStateChangeLog.debug(
|
||||
"BLOCK* NameSystem.UnderReplicationBlock.add:"
|
||||
|
@ -205,7 +217,16 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
|||
int priLevel = getPriority(block, oldReplicas,
|
||||
decommissionedReplicas,
|
||||
oldExpectedReplicas);
|
||||
return remove(block, priLevel);
|
||||
boolean removedBlock = remove(block, priLevel);
|
||||
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
|
||||
oldExpectedReplicas == 1 &&
|
||||
removedBlock) {
|
||||
corruptReplOneBlocks--;
|
||||
assert corruptReplOneBlocks >= 0 :
|
||||
"Number of corrupt blocks with replication factor 1 " +
|
||||
"should be non-negative";
|
||||
}
|
||||
return removedBlock;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -299,6 +320,18 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
|||
+ " at priority level " + curPri);
|
||||
}
|
||||
}
|
||||
if (oldPri != curPri || expectedReplicasDelta != 0) {
|
||||
// corruptReplOneBlocks could possibly change
|
||||
if (curPri == QUEUE_WITH_CORRUPT_BLOCKS &&
|
||||
curExpectedReplicas == 1) {
|
||||
// add a new corrupt block with replication factor 1
|
||||
corruptReplOneBlocks++;
|
||||
} else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS &&
|
||||
curExpectedReplicas - expectedReplicasDelta == 1) {
|
||||
// remove an existing corrupt block with replication factor 1
|
||||
corruptReplOneBlocks--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -5298,6 +5298,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
// not locking
|
||||
return blockManager.getMissingBlocksCount();
|
||||
}
|
||||
|
||||
@Metric({"MissingReplOneBlocks", "Number of missing blocks " +
|
||||
"with replication factor 1"})
|
||||
public long getMissingReplOneBlocksCount() {
|
||||
// not locking
|
||||
return blockManager.getMissingReplOneBlocksCount();
|
||||
}
|
||||
|
||||
@Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
|
||||
public int getExpiredHeartbeats() {
|
||||
|
@ -5339,6 +5346,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
|
||||
stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
|
||||
stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
|
||||
stats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
|
||||
getMissingReplOneBlocksCount();
|
||||
return stats;
|
||||
}
|
||||
|
||||
|
@ -7605,6 +7614,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
return getMissingBlocksCount();
|
||||
}
|
||||
|
||||
@Override // NameNodeMXBean
|
||||
public long getNumberOfMissingBlocksWithReplicationFactorOne() {
|
||||
return getMissingReplOneBlocksCount();
|
||||
}
|
||||
|
||||
@Override // NameNodeMXBean
|
||||
public int getThreads() {
|
||||
return ManagementFactory.getThreadMXBean().getThreadCount();
|
||||
|
|
|
@ -147,10 +147,19 @@ public interface NameNodeMXBean {
|
|||
/**
|
||||
* Gets the total number of missing blocks on the cluster
|
||||
*
|
||||
* @return the total number of files and blocks on the cluster
|
||||
* @return the total number of missing blocks on the cluster
|
||||
*/
|
||||
public long getNumberOfMissingBlocks();
|
||||
|
||||
/**
|
||||
* Gets the total number of missing blocks on the cluster with
|
||||
* replication factor 1
|
||||
*
|
||||
* @return the total number of missing blocks on the cluster with
|
||||
* replication factor 1
|
||||
*/
|
||||
public long getNumberOfMissingBlocksWithReplicationFactorOne();
|
||||
|
||||
/**
|
||||
* Gets the number of threads.
|
||||
*
|
||||
|
|
|
@ -458,6 +458,8 @@ public class DFSAdmin extends FsShell {
|
|||
dfs.getCorruptBlocksCount());
|
||||
System.out.println("Missing blocks: " +
|
||||
dfs.getMissingBlocksCount());
|
||||
System.out.println("Missing blocks (with replication factor 1): " +
|
||||
dfs.getMissingReplOneBlocksCount());
|
||||
|
||||
System.out.println();
|
||||
|
||||
|
|
|
@ -283,6 +283,7 @@ message GetFsStatsResponseProto {
|
|||
required uint64 under_replicated = 4;
|
||||
required uint64 corrupt_blocks = 5;
|
||||
required uint64 missing_blocks = 6;
|
||||
optional uint64 missing_repl_one_blocks = 7;
|
||||
}
|
||||
|
||||
enum DatanodeReportTypeProto { // type of the datanode report
|
||||
|
|
|
@ -77,7 +77,6 @@ public class TestMissingBlocksAlert {
|
|||
Path corruptFile = new Path("/testMissingBlocks/corruptFile");
|
||||
DFSTestUtil.createFile(dfs, corruptFile, fileLen, (short)3, 0);
|
||||
|
||||
|
||||
// Corrupt the block
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, corruptFile);
|
||||
assertTrue(TestDatanodeBlockScanner.corruptReplica(block, 0));
|
||||
|
@ -120,6 +119,24 @@ public class TestMissingBlocksAlert {
|
|||
|
||||
Assert.assertEquals(0, (long)(Long) mbs.getAttribute(mxbeanName,
|
||||
"NumberOfMissingBlocks"));
|
||||
|
||||
Path replOneFile = new Path("/testMissingBlocks/replOneFile");
|
||||
DFSTestUtil.createFile(dfs, replOneFile, fileLen, (short)1, 0);
|
||||
ExtendedBlock replOneBlock = DFSTestUtil.getFirstBlock(
|
||||
dfs, replOneFile);
|
||||
assertTrue(TestDatanodeBlockScanner.corruptReplica(
|
||||
replOneBlock, 0));
|
||||
|
||||
// read the file so that the corrupt block is reported to NN
|
||||
in = dfs.open(replOneFile);
|
||||
try {
|
||||
in.readFully(new byte[fileLen]);
|
||||
} catch (ChecksumException ignored) { // checksum error is expected.
|
||||
}
|
||||
in.close();
|
||||
assertEquals(1, dfs.getMissingReplOneBlocksCount());
|
||||
Assert.assertEquals(1, (long)(Long) mbs.getAttribute(mxbeanName,
|
||||
"NumberOfMissingBlocksWithReplicationFactorOne"));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
|
|
@ -19,10 +19,14 @@
|
|||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestUnderReplicatedBlockQueues extends Assert {
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestUnderReplicatedBlockQueues {
|
||||
|
||||
/**
|
||||
* Test that adding blocks with different replication counts puts them
|
||||
|
@ -36,6 +40,7 @@ public class TestUnderReplicatedBlockQueues extends Assert {
|
|||
Block block2 = new Block(2);
|
||||
Block block_very_under_replicated = new Block(3);
|
||||
Block block_corrupt = new Block(4);
|
||||
Block block_corrupt_repl_one = new Block(5);
|
||||
|
||||
//add a block with a single entry
|
||||
assertAdded(queues, block1, 1, 0, 3);
|
||||
|
@ -64,6 +69,16 @@ public class TestUnderReplicatedBlockQueues extends Assert {
|
|||
assertInLevel(queues, block_very_under_replicated,
|
||||
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
|
||||
|
||||
//insert a corrupt block with replication factor 1
|
||||
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
|
||||
assertEquals(2, queues.getCorruptBlockSize());
|
||||
assertEquals(1, queues.getCorruptReplOneBlockSize());
|
||||
queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2);
|
||||
assertEquals(0, queues.getCorruptReplOneBlockSize());
|
||||
queues.update(block_corrupt, 0, 0, 1, 0, -2);
|
||||
assertEquals(1, queues.getCorruptReplOneBlockSize());
|
||||
queues.update(block_very_under_replicated, 0, 0, 1, -4, -24);
|
||||
assertEquals(2, queues.getCorruptReplOneBlockSize());
|
||||
}
|
||||
|
||||
private void assertAdded(UnderReplicatedBlocks queues,
|
||||
|
|
|
@ -295,6 +295,7 @@ public class TestNameNodeMetrics {
|
|||
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
|
||||
assertGauge("UnderReplicatedBlocks", 1L, rb);
|
||||
assertGauge("MissingBlocks", 1L, rb);
|
||||
assertGauge("MissingReplOneBlocks", 1L, rb);
|
||||
fs.delete(file, true);
|
||||
waitForDnMetricValue(NS_METRICS, "UnderReplicatedBlocks", 0L);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue