HDFS-12043. Add counters for block re-replication. Contributed by Chen Liang.

This commit is contained in:
Arpit Agarwal 2017-08-02 16:45:08 -07:00
parent 223229b99c
commit 45cde760b0
4 changed files with 118 additions and 3 deletions

View File

@ -1629,7 +1629,8 @@ public class BlockManager implements BlockStatsMXBean {
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
} }
private ReplicationWork scheduleReplication(BlockInfo block, int priority) { @VisibleForTesting
ReplicationWork scheduleReplication(BlockInfo block, int priority) {
// skip abandoned block or block reopened for append // skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) { if (block.isDeleted() || !block.isCompleteOrCommitted()) {
// remove from neededReplications // remove from neededReplications
@ -1647,6 +1648,7 @@ public class BlockManager implements BlockStatsMXBean {
numReplicas); numReplicas);
if (srcNode == null) { // block can not be replicated from any node if (srcNode == null) { // block can not be replicated from any node
LOG.debug("Block " + block + " cannot be repl from any node"); LOG.debug("Block " + block + " cannot be repl from any node");
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null; return null;
} }
@ -1659,6 +1661,7 @@ public class BlockManager implements BlockStatsMXBean {
neededReplications.remove(block, priority); neededReplications.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReplications as" + blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block); " it has enough replicas", block);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null; return null;
} }
@ -3356,7 +3359,9 @@ public class BlockManager implements BlockStatsMXBean {
BlockInfo storedBlock = getStoredBlock(block); BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock != null && if (storedBlock != null &&
block.getGenerationStamp() == storedBlock.getGenerationStamp()) { block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
pendingReplications.decrement(storedBlock, node); if(pendingReplications.decrement(storedBlock, node)) {
NameNode.getNameNodeMetrics().incSuccessfulReReplications();
}
} }
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode); delHintNode);

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -95,8 +96,10 @@ class PendingReplicationBlocks {
* for this block. * for this block.
* *
* @param dn The DataNode that finishes the replication * @param dn The DataNode that finishes the replication
* @return true if the block is decremented to 0 and got removed.
*/ */
void decrement(BlockInfo block, DatanodeDescriptor dn) { boolean decrement(BlockInfo block, DatanodeDescriptor dn) {
boolean removed = false;
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found != null) { if (found != null) {
@ -106,9 +109,11 @@ class PendingReplicationBlocks {
found.decrementReplicas(dn); found.decrementReplicas(dn);
if (found.getNumReplicas() <= 0) { if (found.getNumReplicas() <= 0) {
pendingReplications.remove(block); pendingReplications.remove(block);
removed = true;
} }
} }
} }
return removed;
} }
/** /**
@ -265,6 +270,7 @@ class PendingReplicationBlocks {
timedOutItems.add(block); timedOutItems.add(block);
} }
LOG.warn("PendingReplicationMonitor timed out " + block); LOG.warn("PendingReplicationMonitor timed out " + block);
NameNode.getNameNodeMetrics().incTimeoutReReplications();
iter.remove(); iter.remove();
} }
} }

View File

@ -58,6 +58,12 @@ public class NameNodeMetrics {
@Metric MutableCounterLong createSymlinkOps; @Metric MutableCounterLong createSymlinkOps;
@Metric MutableCounterLong getLinkTargetOps; @Metric MutableCounterLong getLinkTargetOps;
@Metric MutableCounterLong filesInGetListingOps; @Metric MutableCounterLong filesInGetListingOps;
@Metric ("Number of successful re-replications")
MutableCounterLong successfulReReplications;
@Metric ("Number of times we failed to schedule a block re-replication.")
MutableCounterLong numTimesReReplicationNotScheduled;
@Metric("Number of timed out block re-replications")
MutableCounterLong timeoutReReplications;
@Metric("Number of allowSnapshot operations") @Metric("Number of allowSnapshot operations")
MutableCounterLong allowSnapshotOps; MutableCounterLong allowSnapshotOps;
@Metric("Number of disallowSnapshot operations") @Metric("Number of disallowSnapshot operations")
@ -305,6 +311,18 @@ public class NameNodeMetrics {
transactionsBatchedInSync.incr(count); transactionsBatchedInSync.incr(count);
} }
public void incSuccessfulReReplications() {
successfulReReplications.incr();
}
public void incNumTimesReReplicationNotScheduled() {
numTimesReReplicationNotScheduled.incr();
}
public void incTimeoutReReplications() {
timeoutReReplications.incr();
}
public void addSync(long elapsed) { public void addSync(long elapsed) {
syncs.add(elapsed); syncs.add(elapsed);
for (MutableQuantiles q : syncsQuantiles) { for (MutableQuantiles q : syncsQuantiles) {

View File

@ -17,13 +17,21 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -44,6 +52,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -472,4 +482,80 @@ public class TestPendingReplication {
cluster.shutdown(); cluster.shutdown();
} }
} }
/**
* Test the metric counters of the re-replication process.
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test (timeout = 300000)
public void testReplicationCounter() throws IOException,
InterruptedException, TimeoutException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
MiniDFSCluster tmpCluster = new MiniDFSCluster.Builder(conf).numDataNodes(
DATANODE_COUNT).build();
tmpCluster.waitActive();
FSNamesystem fsn = tmpCluster.getNamesystem(0);
fsn.writeLock();
try {
BlockManager bm = fsn.getBlockManager();
BlocksMap blocksMap = bm.blocksMap;
// create three blockInfo below, blockInfo0 will success, blockInfo1 will
// time out, blockInfo2 will fail the replication.
BlockCollection bc0 = Mockito.mock(BlockCollection.class);
BlockInfo blockInfo0 = new BlockInfoContiguous((short) 3);
blockInfo0.setBlockId(0);
BlockCollection bc1 = Mockito.mock(BlockCollection.class);
BlockInfo blockInfo1 = new BlockInfoContiguous((short) 3);
blockInfo1.setBlockId(1);
BlockCollection bc2 = Mockito.mock(BlockCollection.class);
BlockInfo blockInfo2 = new BlockInfoContiguous((short) 3);
blockInfo2.setBlockId(2);
blocksMap.addBlockCollection(blockInfo0, bc0);
blocksMap.addBlockCollection(blockInfo1, bc1);
blocksMap.addBlockCollection(blockInfo2, bc2);
PendingReplicationBlocks pending = bm.pendingReplications;
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
assertCounter("SuccessfulReReplications", 0L, rb);
assertCounter("NumTimesReReplicationNotScheduled", 0L, rb);
assertCounter("TimeoutReReplications", 0L, rb);
// add block0 and block1 to pending queue.
pending.increment(blockInfo0);
pending.increment(blockInfo1);
// call addBlock on block0 will make it successfully replicated.
// not calling addBlock on block1 will make it timeout later.
DatanodeStorageInfo[] storageInfos =
DFSTestUtil.createDatanodeStorageInfos(1);
bm.addBlock(storageInfos[0], blockInfo0, null);
// call schedule replication on blockInfo2 will fail the re-replication.
// because there is no source data to replicate from.
bm.scheduleReplication(blockInfo2, 0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
return getLongCounter("SuccessfulReReplications", rb) == 1 &&
getLongCounter("NumTimesReReplicationNotScheduled", rb) == 1 &&
getLongCounter("TimeoutReReplications", rb) == 1;
}
}, 100, 60000);
} finally {
tmpCluster.shutdown();
fsn.writeUnlock();
}
}
} }