HDFS-6682. Add a metric to expose the timestamp of the oldest under-replicated block. (aajisaka)

This commit is contained in:
Akira Ajisaka 2015-07-24 11:37:23 +09:00
parent ab3197c204
commit 02c01815ec
6 changed files with 93 additions and 5 deletions

View File

@ -201,6 +201,7 @@ Each metrics record contains tags such as HAState and Hostname as additional inf
| Name | Description |
|:---- |:---- |
| `MissingBlocks` | Current number of missing blocks |
| `TimeOfTheOldestBlockToBeReplicated` | The timestamp of the oldest block to be replicated. If there are no under-replicated or corrupt blocks, return 0. |
| `ExpiredHeartbeats` | Total number of expired heartbeats |
| `TransactionsSinceLastCheckpoint` | Total number of transactions since last checkpoint |
| `TransactionsSinceLastLogRoll` | Total number of transactions since last edit log roll |

View File

@ -747,6 +747,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8730. Clean up the import statements in ClientProtocol.
(Takanobu Asanuma via wheat9)
HDFS-6682. Add a metric to expose the timestamp of the oldest
under-replicated block. (aajisaka)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -171,6 +171,10 @@ public class BlockManager implements BlockStatsMXBean {
public int getPendingDataNodeMessageCount() {
return pendingDNMessages.count();
}
/** Used by metrics. */
public long getTimeOfTheOldestBlockToBeReplicated() {
return neededReplications.getTimeOfTheOldestBlockToBeReplicated();
}
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;

View File

@ -18,10 +18,15 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Time;
/**
* Keep prioritized queues of under replicated blocks.
@ -82,6 +87,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
/** The number of corrupt blocks with replication factor 1 */
private int corruptReplOneBlocks = 0;
/** Keep timestamp when a block is put into the queue. */
private final Map<BlockInfo, Long> timestampsMap =
Collections.synchronizedMap(new LinkedHashMap<BlockInfo, Long>());
/** Create an object. */
UnderReplicatedBlocks() {
@ -91,12 +99,13 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
}
/**
* Empty the queues.
* Empty the queues and timestamps.
*/
void clear() {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.get(i).clear();
}
timestampsMap.clear();
}
/** Return the total number of under replication blocks */
@ -119,6 +128,20 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
return size;
}
/**
* Return the smallest timestamp of the under-replicated/corrupt blocks.
* If there are no under-replicated or corrupt blocks, return 0.
*/
long getTimeOfTheOldestBlockToBeReplicated() {
synchronized (timestampsMap) {
if (timestampsMap.isEmpty()) {
return 0;
}
// Since we are using LinkedHashMap, the first value is the smallest.
return timestampsMap.entrySet().iterator().next().getValue();
}
}
/** Return the number of corrupt blocks */
synchronized int getCorruptBlockSize() {
return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
@ -197,7 +220,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
+ " has only {} replicas and need {} replicas so is added to" +
" neededReplications at priority level {}", block, curReplicas,
expectedReplicas, priLevel);
timestampsMap.put(block, Time.now());
return true;
}
return false;
@ -242,8 +265,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
if(priLevel >= 0 && priLevel < LEVEL
&& priorityQueues.get(priLevel).remove(block)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
" from priority queue {}", block, priLevel);
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
" from priority queue {}", block, priLevel);
timestampsMap.remove(block);
return true;
} else {
// Try to remove the block from all queues if the block was
@ -253,6 +277,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
" {} from priority queue {}", block, priLevel);
timestampsMap.remove(block);
return true;
}
}

View File

@ -3770,7 +3770,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// not locking
return blockManager.getMissingReplOneBlocksCount();
}
@Metric({"TimeOfTheOldestBlockToBeReplicated",
"The timestamp of the oldest block to be replicated. If there are no" +
"under-replicated or corrupt blocks, return 0."})
public long getTimeOfTheOldestBlockToBeReplicated() {
return blockManager.getTimeOfTheOldestBlockToBeReplicated();
}
@Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
public int getExpiredHeartbeats() {
return datanodeStatistics.getExpiredHeartbeats();

View File

@ -28,8 +28,10 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import java.util.Iterator;
@ -146,4 +148,50 @@ public class TestUnderReplicatedBlocks {
}
@Test
public void testGetTimeOfTheOldestBlockToBeReplicated() {
UnderReplicatedBlocks blocks = new UnderReplicatedBlocks();
BlockInfo block1 = new BlockInfoContiguous(new Block(1), (short) 1);
BlockInfo block2 = new BlockInfoContiguous(new Block(2), (short) 1);
// if there are no under-replicated or corrupt blocks, return 0
assertEquals(blocks.getTimeOfTheOldestBlockToBeReplicated(), 0L);
// add block1, add block2, remove block1, remove block2
long time1 = Time.now();
blocks.add(block1, 1, 0, 3);
long time2 = Time.now();
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
blocks.add(block2, 2, 0, 3);
long time3 = Time.now();
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
blocks.remove(block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time2);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time3);
blocks.remove(block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
assertEquals(blocks.getTimeOfTheOldestBlockToBeReplicated(), 0L);
// add block2, add block1, remove block1, remove block2
time1 = Time.now();
blocks.add(block2, 2, 0, 3);
time2 = Time.now();
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
blocks.add(block1, 1, 0, 3);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
blocks.remove(block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
blocks.remove(block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
assertEquals(blocks.getTimeOfTheOldestBlockToBeReplicated(), 0L);
}
}