diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fcaabc6a166..2d48b8acc7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -704,6 +704,9 @@ Release 0.23.0 - Unreleased
HDFS-2471. Add federation documentation. (suresh)
+ HDFS-2485. Improve code layout and constants in UnderReplicatedBlocks
+ (stevel)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 7b39860d06f..dc8d9e8db37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -26,19 +26,66 @@ import java.util.TreeSet;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-/** Keep track of under replication blocks.
- * Blocks have replication priority, with priority 0 indicating the highest
- * Blocks have only one replicas has the highest
+/**
+ * Keep prioritized queues of under replicated blocks.
+ * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
+ * indicating the highest priority.
+ *
+ * Having a prioritised queue allows the {@link BlockManager} to select
+ * which blocks to replicate first -it tries to give priority to data
+ * that is most at risk or considered most valuable.
+ *
+ *
+ * The policy for choosing which priority to give added blocks
+ * is implemented in {@link #getPriority(Block, int, int, int)}.
+ *
+ * The queue order is as follows:
+ *
+ * - {@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
+ * first. That is blocks with only one copy, or blocks with zero live
+ * copies but a copy in a node being decommissioned. These blocks
+ * are at risk of loss if the disk or server on which they
+ * remain fails.
+ * - {@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
+ * under-replicated compared to their expected values. Currently
+ * that means the ratio of the ratio of actual:expected means that
+ * there is less than 1:3.
. These blocks may not be at risk,
+ * but they are clearly considered "important".
+ * - {@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
+ * replicated, and the ratio of actual:expected is good enough that
+ * they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
+ * queue.
+ * - {@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
+ * many copies of a block as required, but the blocks are not adequately
+ * distributed. Loss of a rack/switch could take all copies off-line.
+ * - {@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
+ * and for which there are no-non-corrupt copies (currently) available.
+ * The policy here is to keep those corrupt blocks replicated, but give
+ * blocks that are not corrupt higher priority.
+ *
*/
class UnderReplicatedBlocks implements Iterable {
+ /** The total number of queues : {@value} */
static final int LEVEL = 5;
+ /** The queue with the highest priority: {@value} */
+ static final int QUEUE_HIGHEST_PRIORITY = 0;
+ /** The queue for blocks that are way below their expected value : {@value} */
+ static final int QUEUE_VERY_UNDER_REPLICATED = 1;
+ /** The queue for "normally" under-replicated blocks: {@value} */
+ static final int QUEUE_UNDER_REPLICATED = 2;
+ /** The queue for blocks that have the right number of replicas,
+ * but which the block manager felt were badly distributed: {@value}
+ */
+ static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
+ /** The queue for corrupt blocks: {@value} */
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
+ /** the queues themselves */
private final List> priorityQueues
- = new ArrayList>();
-
+ = new ArrayList>(LEVEL);
+
/** Create an object. */
UnderReplicatedBlocks() {
- for(int i=0; i());
}
}
@@ -47,7 +94,7 @@ class UnderReplicatedBlocks implements Iterable {
* Empty the queues.
*/
void clear() {
- for(int i=0; i {
/** Return the total number of under replication blocks */
synchronized int size() {
int size = 0;
- for (int i=0; i {
/** Return the number of under replication blocks excluding corrupt blocks */
synchronized int getUnderReplicatedBlockCount() {
int size = 0;
- for (int i=0; i {
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
- for(NavigableSet set : priorityQueues) {
- if(set.contains(block)) { return true; }
+ for (NavigableSet set : priorityQueues) {
+ if (set.contains(block)) {
+ return true;
+ }
}
return false;
}
-
+
/** Return the priority of a block
- * @param block a under replication block
+ * @param block a under replicated block
* @param curReplicas current number of replicas of the block
* @param expectedReplicas expected number of replicas of the block
+ * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
*/
- private int getPriority(Block block,
+ private int getPriority(Block block,
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
if (curReplicas >= expectedReplicas) {
- return 3; // Block doesn't have enough racks
- } else if(curReplicas==0) {
- // If there are zero non-decommissioned replica but there are
+ // Block has enough copies, but not enough racks
+ return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
+ } else if (curReplicas == 0) {
+ // If there are zero non-decommissioned replicas but there are
// some decommissioned replicas, then assign them highest priority
if (decommissionedReplicas > 0) {
- return 0;
+ return QUEUE_HIGHEST_PRIORITY;
}
- return QUEUE_WITH_CORRUPT_BLOCKS; // keep these blocks in needed replication.
- } else if(curReplicas==1) {
- return 0; // highest priority
- } else if(curReplicas*3 {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.add:"
+ block
- + " has only "+curReplicas
+ + " has only " + curReplicas
+ " replicas and need " + expectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + priLevel);
@@ -149,8 +209,22 @@ class UnderReplicatedBlocks implements Iterable {
oldExpectedReplicas);
return remove(block, priLevel);
}
-
- /** remove a block from a under replication queue given a priority*/
+
+ /**
+ * Remove a block from the under replication queues.
+ *
+ * The priLevel parameter is a hint of which queue to query
+ * first: if negative or >= {@link #LEVEL} this shortcutting
+ * is not attmpted.
+ *
+ * If the block is not found in the nominated queue, an attempt is made to
+ * remove it from all queues.
+ *
+ * Warning: This is not a synchronized method.
+ * @param block block to remove
+ * @param priLevel expected privilege level
+ * @return true if the block was found and removed from one of the priority queues
+ */
boolean remove(Block block, int priLevel) {
if(priLevel >= 0 && priLevel < LEVEL
&& priorityQueues.get(priLevel).remove(block)) {
@@ -164,8 +238,8 @@ class UnderReplicatedBlocks implements Iterable {
} else {
// Try to remove the block from all queues if the block was
// not found in the queue for the given priority level.
- for(int i=0; i {
}
return false;
}
-
- /** update the priority level of a block */
- synchronized void update(Block block, int curReplicas,
+
+ /**
+ * Recalculate and potentially update the priority level of a block.
+ *
+ * If the block priority has changed from before an attempt is made to
+ * remove it from the block queue. Regardless of whether or not the block
+ * is in the block queue of (recalculate) priority, an attempt is made
+ * to add it to that queue. This ensures that the block will be
+ * in its expected priority queue (and only that queue) by the end of the
+ * method call.
+ * @param block a under replicated block
+ * @param curReplicas current number of replicas of the block
+ * @param decommissionedReplicas the number of decommissioned replicas
+ * @param curExpectedReplicas expected number of replicas of the block
+ * @param curReplicasDelta the change in the replicate count from before
+ * @param expectedReplicasDelta the change in the expected replica count from before
+ */
+ synchronized void update(Block block, int curReplicas,
int decommissionedReplicas,
int curExpectedReplicas,
int curReplicasDelta, int expectedReplicasDelta) {
@@ -206,7 +295,7 @@ class UnderReplicatedBlocks implements Iterable {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.update:"
+ block
- + " has only "+curReplicas
+ + " has only "+ curReplicas
+ " replicas and needs " + curExpectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + curPri);
@@ -218,17 +307,24 @@ class UnderReplicatedBlocks implements Iterable {
synchronized BlockIterator iterator(int level) {
return new BlockIterator(level);
}
-
+
/** return an iterator of all the under replication blocks */
+ @Override
public synchronized BlockIterator iterator() {
return new BlockIterator();
}
-
+
+ /**
+ * An iterator over blocks.
+ */
class BlockIterator implements Iterator {
private int level;
private boolean isIteratorForLevel = false;
private List> iterators = new ArrayList>();
+ /**
+ * Construct an iterator over all queues.
+ */
private BlockIterator() {
level=0;
for(int i=0; i {
}
}
+ /**
+ * Constrict an iterator for a single queue level
+ * @param l the priority level to iterate over
+ */
private BlockIterator(int l) {
level = l;
isIteratorForLevel = true;
@@ -243,8 +343,9 @@ class UnderReplicatedBlocks implements Iterable {
}
private void update() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
return;
+ }
while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
level++;
}
@@ -252,30 +353,33 @@ class UnderReplicatedBlocks implements Iterable {
@Override
public Block next() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
return iterators.get(0).next();
+ }
update();
return iterators.get(level).next();
}
@Override
public boolean hasNext() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
return iterators.get(0).hasNext();
+ }
update();
return iterators.get(level).hasNext();
}
@Override
public void remove() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
iterators.get(0).remove();
- else
+ } else {
iterators.get(level).remove();
+ }
}
int getPriority() {
return level;
}
- }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
new file mode 100644
index 00000000000..20c2541119c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUnderReplicatedBlockQueues extends Assert {
+
+ /**
+ * Test that adding blocks with different replication counts puts them
+ * into different queues
+ * @throws Throwable if something goes wrong
+ */
+ @Test
+ public void testBlockPriorities() throws Throwable {
+ UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
+ Block block1 = new Block(1);
+ Block block2 = new Block(2);
+ Block block_very_under_replicated = new Block(3);
+ Block block_corrupt = new Block(4);
+
+ //add a block with a single entry
+ assertAdded(queues, block1, 1, 0, 3);
+
+ assertEquals(1, queues.getUnderReplicatedBlockCount());
+ assertEquals(1, queues.size());
+ assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
+ //repeated additions fail
+ assertFalse(queues.add(block1, 1, 0, 3));
+
+ //add a second block with two replicas
+ assertAdded(queues, block2, 2, 0, 3);
+ assertEquals(2, queues.getUnderReplicatedBlockCount());
+ assertEquals(2, queues.size());
+ assertInLevel(queues, block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
+ //now try to add a block that is corrupt
+ assertAdded(queues, block_corrupt, 0, 0, 3);
+ assertEquals(3, queues.size());
+ assertEquals(2, queues.getUnderReplicatedBlockCount());
+ assertEquals(1, queues.getCorruptBlockSize());
+ assertInLevel(queues, block_corrupt,
+ UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+
+ //insert a very under-replicated block
+ assertAdded(queues, block_very_under_replicated, 4, 0, 25);
+ assertInLevel(queues, block_very_under_replicated,
+ UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
+
+ }
+
+ private void assertAdded(UnderReplicatedBlocks queues,
+ Block block,
+ int curReplicas,
+ int decomissionedReplicas,
+ int expectedReplicas) {
+ assertTrue("Failed to add " + block,
+ queues.add(block,
+ curReplicas,
+ decomissionedReplicas,
+ expectedReplicas));
+ }
+
+ /**
+ * Determine whether or not a block is in a level without changing the API.
+ * Instead get the per-level iterator and run though it looking for a match.
+ * If the block is not found, an assertion is thrown.
+ *
+ * This is inefficient, but this is only a test case.
+ * @param queues queues to scan
+ * @param block block to look for
+ * @param level level to select
+ */
+ private void assertInLevel(UnderReplicatedBlocks queues,
+ Block block,
+ int level) {
+ UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level);
+ while (bi.hasNext()) {
+ Block next = bi.next();
+ if (block.equals(next)) {
+ return;
+ }
+ }
+ fail("Block " + block + " not found in level " + level);
+ }
+}