HDFS-2485

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1187889 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Steve Loughran 2011-10-23 14:15:09 +00:00
parent b9e90a3319
commit ca1aa84fd5
3 changed files with 256 additions and 46 deletions

View File

@ -704,6 +704,9 @@ Release 0.23.0 - Unreleased
HDFS-2471. Add federation documentation. (suresh)
HDFS-2485. Improve code layout and constants in UnderReplicatedBlocks
(stevel)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -26,19 +26,66 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
/** Keep track of under replication blocks.
* Blocks have replication priority, with priority 0 indicating the highest
* Blocks have only one replicas has the highest
/**
* Keep prioritized queues of under replicated blocks.
* Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
* indicating the highest priority.
* </p>
* Having a prioritised queue allows the {@link BlockManager} to select
* which blocks to replicate first -it tries to give priority to data
* that is most at risk or considered most valuable.
*
* <p/>
* The policy for choosing which priority to give added blocks
* is implemented in {@link #getPriority(Block, int, int, int)}.
* </p>
* <p>The queue order is as follows:</p>
* <ol>
* <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
* first. That is blocks with only one copy, or blocks with zero live
* copies but a copy in a node being decommissioned. These blocks
* are at risk of loss if the disk or server on which they
* remain fails.</li>
* <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
* under-replicated compared to their expected values. Currently
* that means the ratio of the ratio of actual:expected means that
* there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
* but they are clearly considered "important".
* <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
* replicated, and the ratio of actual:expected is good enough that
* they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
* queue.</li>
* <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
* many copies of a block as required, but the blocks are not adequately
* distributed. Loss of a rack/switch could take all copies off-line.</li>
* <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
* and for which there are no-non-corrupt copies (currently) available.
* The policy here is to keep those corrupt blocks replicated, but give
* blocks that are not corrupt higher priority.</li>
* </ol>
*/
class UnderReplicatedBlocks implements Iterable<Block> {
/** The total number of queues : {@value} */
static final int LEVEL = 5;
/** The queue with the highest priority: {@value} */
static final int QUEUE_HIGHEST_PRIORITY = 0;
/** The queue for blocks that are way below their expected value : {@value} */
static final int QUEUE_VERY_UNDER_REPLICATED = 1;
/** The queue for "normally" under-replicated blocks: {@value} */
static final int QUEUE_UNDER_REPLICATED = 2;
/** The queue for blocks that have the right number of replicas,
* but which the block manager felt were badly distributed: {@value}
*/
static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
/** The queue for corrupt blocks: {@value} */
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
/** the queues themselves */
private final List<NavigableSet<Block>> priorityQueues
= new ArrayList<NavigableSet<Block>>();
= new ArrayList<NavigableSet<Block>>(LEVEL);
/** Create an object. */
UnderReplicatedBlocks() {
for(int i=0; i<LEVEL; i++) {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new TreeSet<Block>());
}
}
@ -47,7 +94,7 @@ class UnderReplicatedBlocks implements Iterable<Block> {
* Empty the queues.
*/
void clear() {
for(int i=0; i<LEVEL; i++) {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.get(i).clear();
}
}
@ -55,7 +102,7 @@ void clear() {
/** Return the total number of under replication blocks */
synchronized int size() {
int size = 0;
for (int i=0; i<LEVEL; i++) {
for (int i = 0; i < LEVEL; i++) {
size += priorityQueues.get(i).size();
}
return size;
@ -64,12 +111,14 @@ synchronized int size() {
/** Return the number of under replication blocks excluding corrupt blocks */
synchronized int getUnderReplicatedBlockCount() {
int size = 0;
for (int i=0; i<QUEUE_WITH_CORRUPT_BLOCKS; i++) {
size += priorityQueues.get(i).size();
for (int i = 0; i < LEVEL; i++) {
if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
size += priorityQueues.get(i).size();
}
}
return size;
}
/** Return the number of corrupt blocks */
synchronized int getCorruptBlockSize() {
return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
@ -77,47 +126,58 @@ synchronized int getCorruptBlockSize() {
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
for(NavigableSet<Block> set : priorityQueues) {
if(set.contains(block)) { return true; }
for (NavigableSet<Block> set : priorityQueues) {
if (set.contains(block)) {
return true;
}
}
return false;
}
/** Return the priority of a block
* @param block a under replication block
* @param block a under replicated block
* @param curReplicas current number of replicas of the block
* @param expectedReplicas expected number of replicas of the block
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
*/
private int getPriority(Block block,
private int getPriority(Block block,
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
if (curReplicas >= expectedReplicas) {
return 3; // Block doesn't have enough racks
} else if(curReplicas==0) {
// If there are zero non-decommissioned replica but there are
// Block has enough copies, but not enough racks
return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
} else if (curReplicas == 0) {
// If there are zero non-decommissioned replicas but there are
// some decommissioned replicas, then assign them highest priority
if (decommissionedReplicas > 0) {
return 0;
return QUEUE_HIGHEST_PRIORITY;
}
return QUEUE_WITH_CORRUPT_BLOCKS; // keep these blocks in needed replication.
} else if(curReplicas==1) {
return 0; // highest priority
} else if(curReplicas*3<expectedReplicas) {
return 1;
//all we have are corrupt blocks
return QUEUE_WITH_CORRUPT_BLOCKS;
} else if (curReplicas == 1) {
//only on replica -risk of loss
// highest priority
return QUEUE_HIGHEST_PRIORITY;
} else if ((curReplicas * 3) < expectedReplicas) {
//there is less than a third as many blocks as requested;
//this is considered very under-replicated
return QUEUE_VERY_UNDER_REPLICATED;
} else {
return 2;
//add to the normal queue for under replicated blocks
return QUEUE_UNDER_REPLICATED;
}
}
/** add a block to a under replication queue according to its priority
* @param block a under replication block
* @param curReplicas current number of replicas of the block
* @param decomissionedReplicas the number of decommissioned replicas
* @param expectedReplicas expected number of replicas of the block
* @return true if the block was added to a queue.
*/
synchronized boolean add(
Block block,
synchronized boolean add(Block block,
int curReplicas,
int decomissionedReplicas,
int expectedReplicas) {
@ -129,7 +189,7 @@ synchronized boolean add(
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.add:"
+ block
+ " has only "+curReplicas
+ " has only " + curReplicas
+ " replicas and need " + expectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + priLevel);
@ -149,8 +209,22 @@ synchronized boolean remove(Block block,
oldExpectedReplicas);
return remove(block, priLevel);
}
/** remove a block from a under replication queue given a priority*/
/**
* Remove a block from the under replication queues.
*
* The priLevel parameter is a hint of which queue to query
* first: if negative or &gt;= {@link #LEVEL} this shortcutting
* is not attmpted.
*
* If the block is not found in the nominated queue, an attempt is made to
* remove it from all queues.
*
* <i>Warning:</i> This is not a synchronized method.
* @param block block to remove
* @param priLevel expected privilege level
* @return true if the block was found and removed from one of the priority queues
*/
boolean remove(Block block, int priLevel) {
if(priLevel >= 0 && priLevel < LEVEL
&& priorityQueues.get(priLevel).remove(block)) {
@ -164,8 +238,8 @@ boolean remove(Block block, int priLevel) {
} else {
// Try to remove the block from all queues if the block was
// not found in the queue for the given priority level.
for(int i=0; i<LEVEL; i++) {
if(priorityQueues.get(i).remove(block)) {
for (int i = 0; i < LEVEL; i++) {
if (priorityQueues.get(i).remove(block)) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: "
@ -178,9 +252,24 @@ boolean remove(Block block, int priLevel) {
}
return false;
}
/** update the priority level of a block */
synchronized void update(Block block, int curReplicas,
/**
* Recalculate and potentially update the priority level of a block.
*
* If the block priority has changed from before an attempt is made to
* remove it from the block queue. Regardless of whether or not the block
* is in the block queue of (recalculate) priority, an attempt is made
* to add it to that queue. This ensures that the block will be
* in its expected priority queue (and only that queue) by the end of the
* method call.
* @param block a under replicated block
* @param curReplicas current number of replicas of the block
* @param decommissionedReplicas the number of decommissioned replicas
* @param curExpectedReplicas expected number of replicas of the block
* @param curReplicasDelta the change in the replicate count from before
* @param expectedReplicasDelta the change in the expected replica count from before
*/
synchronized void update(Block block, int curReplicas,
int decommissionedReplicas,
int curExpectedReplicas,
int curReplicasDelta, int expectedReplicasDelta) {
@ -206,7 +295,7 @@ synchronized void update(Block block, int curReplicas,
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.update:"
+ block
+ " has only "+curReplicas
+ " has only "+ curReplicas
+ " replicas and needs " + curExpectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + curPri);
@ -218,17 +307,24 @@ synchronized void update(Block block, int curReplicas,
synchronized BlockIterator iterator(int level) {
return new BlockIterator(level);
}
/** return an iterator of all the under replication blocks */
@Override
public synchronized BlockIterator iterator() {
return new BlockIterator();
}
/**
* An iterator over blocks.
*/
class BlockIterator implements Iterator<Block> {
private int level;
private boolean isIteratorForLevel = false;
private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
/**
* Construct an iterator over all queues.
*/
private BlockIterator() {
level=0;
for(int i=0; i<LEVEL; i++) {
@ -236,6 +332,10 @@ private BlockIterator() {
}
}
/**
* Constrict an iterator for a single queue level
* @param l the priority level to iterate over
*/
private BlockIterator(int l) {
level = l;
isIteratorForLevel = true;
@ -243,8 +343,9 @@ private BlockIterator(int l) {
}
private void update() {
if (isIteratorForLevel)
if (isIteratorForLevel) {
return;
}
while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
level++;
}
@ -252,30 +353,33 @@ private void update() {
@Override
public Block next() {
if (isIteratorForLevel)
if (isIteratorForLevel) {
return iterators.get(0).next();
}
update();
return iterators.get(level).next();
}
@Override
public boolean hasNext() {
if (isIteratorForLevel)
if (isIteratorForLevel) {
return iterators.get(0).hasNext();
}
update();
return iterators.get(level).hasNext();
}
@Override
public void remove() {
if (isIteratorForLevel)
if (isIteratorForLevel) {
iterators.get(0).remove();
else
} else {
iterators.get(level).remove();
}
}
int getPriority() {
return level;
}
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.junit.Assert;
import org.junit.Test;
public class TestUnderReplicatedBlockQueues extends Assert {
/**
* Test that adding blocks with different replication counts puts them
* into different queues
* @throws Throwable if something goes wrong
*/
@Test
public void testBlockPriorities() throws Throwable {
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
Block block1 = new Block(1);
Block block2 = new Block(2);
Block block_very_under_replicated = new Block(3);
Block block_corrupt = new Block(4);
//add a block with a single entry
assertAdded(queues, block1, 1, 0, 3);
assertEquals(1, queues.getUnderReplicatedBlockCount());
assertEquals(1, queues.size());
assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
//repeated additions fail
assertFalse(queues.add(block1, 1, 0, 3));
//add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3);
assertEquals(2, queues.getUnderReplicatedBlockCount());
assertEquals(2, queues.size());
assertInLevel(queues, block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
//now try to add a block that is corrupt
assertAdded(queues, block_corrupt, 0, 0, 3);
assertEquals(3, queues.size());
assertEquals(2, queues.getUnderReplicatedBlockCount());
assertEquals(1, queues.getCorruptBlockSize());
assertInLevel(queues, block_corrupt,
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
//insert a very under-replicated block
assertAdded(queues, block_very_under_replicated, 4, 0, 25);
assertInLevel(queues, block_very_under_replicated,
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
}
private void assertAdded(UnderReplicatedBlocks queues,
Block block,
int curReplicas,
int decomissionedReplicas,
int expectedReplicas) {
assertTrue("Failed to add " + block,
queues.add(block,
curReplicas,
decomissionedReplicas,
expectedReplicas));
}
/**
* Determine whether or not a block is in a level without changing the API.
* Instead get the per-level iterator and run though it looking for a match.
* If the block is not found, an assertion is thrown.
*
* This is inefficient, but this is only a test case.
* @param queues queues to scan
* @param block block to look for
* @param level level to select
*/
private void assertInLevel(UnderReplicatedBlocks queues,
Block block,
int level) {
UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level);
while (bi.hasNext()) {
Block next = bi.next();
if (block.equals(next)) {
return;
}
}
fail("Block " + block + " not found in level " + level);
}
}