From 32c810c8192587d116c92b7ed5affbcca212118a Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 15 Oct 2015 18:07:09 +0800 Subject: [PATCH] HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../server/blockmanagement/BlockManager.java | 35 ++- .../blockmanagement/DecommissionManager.java | 2 +- .../blockmanagement/NumberReplicas.java | 18 +- .../UnderReplicatedBlocks.java | 203 ++++++------------ .../TestReplicationPolicy.java | 71 +++--- .../TestUnderReplicatedBlockQueues.java | 14 +- 7 files changed, 147 insertions(+), 198 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 28c96ebfd3c..28709c23ca6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -704,6 +704,8 @@ Release 2.8.0 - UNRELEASED HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei) + HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d2fedd2e66a..de13ff396e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -751,6 +751,7 @@ public class BlockManager implements BlockStatsMXBean { // Remove block from replication queue. NumberReplicas replicas = countNodes(lastBlock); neededReplications.remove(lastBlock, replicas.liveReplicas(), + replicas.readOnlyReplicas(), replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); pendingReplications.remove(lastBlock); @@ -1650,6 +1651,7 @@ public class BlockManager implements BlockStatsMXBean { nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; int live = 0; + int readonly = 0; int decommissioned = 0; int decommissioning = 0; int corrupt = 0; @@ -1673,6 +1675,9 @@ public class BlockManager implements BlockStatsMXBean { nodesContainingLiveReplicas.add(storage); live += countableReplica; } + if (storage.getState() == State.READ_ONLY_SHARED) { + readonly++; + } containingNodes.add(node); // Check if this replica is corrupt // If so, do not select the node as src node @@ -1707,7 +1712,7 @@ public class BlockManager implements BlockStatsMXBean { srcNode = node; } if(numReplicas != null) - numReplicas.initialize(live, decommissioned, decommissioning, corrupt, + numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt, excess, 0); return srcNode; } @@ -1732,7 +1737,7 @@ public class BlockManager implements BlockStatsMXBean { } NumberReplicas num = countNodes(timedOutItems[i]); if (isNeededReplication(bi, num.liveReplicas())) { - neededReplications.add(bi, num.liveReplicas(), + neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), getReplication(bi)); } } @@ -2614,6 +2619,7 @@ public class BlockManager implements BlockStatsMXBean { short fileReplication = getExpectedReplicaNum(storedBlock); if (!isNeededReplication(storedBlock, numCurrentReplica)) { neededReplications.remove(storedBlock, numCurrentReplica, + num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), fileReplication); } else { updateNeededReplications(storedBlock, curReplicaDelta, 0); @@ -2846,8 +2852,8 @@ public class BlockManager implements BlockStatsMXBean { int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be if (isNeededReplication(block, numCurrentReplica)) { - if (neededReplications.add(block, numCurrentReplica, num - .decommissionedAndDecommissioning(), expectedReplication)) { + if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(), + num.decommissionedAndDecommissioning(), expectedReplication)) { return MisReplicationResult.UNDER_REPLICATED; } } @@ -3280,15 +3286,22 @@ public class BlockManager implements BlockStatsMXBean { * Return the number of nodes hosting a given block, grouped * by the state of those replicas. */ - public NumberReplicas countNodes(BlockInfo b) { + public NumberReplicas countNodes(Block b) { int decommissioned = 0; int decommissioning = 0; int live = 0; + int readonly = 0; int corrupt = 0; int excess = 0; int stale = 0; Collection nodesCorrupt = corruptReplicas.getNodes(b); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + if (storage.getState() == State.FAILED) { + continue; + } else if (storage.getState() == State.READ_ONLY_SHARED) { + readonly++; + continue; + } final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; @@ -3309,7 +3322,8 @@ public class BlockManager implements BlockStatsMXBean { stale++; } } - return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale); + return new NumberReplicas(live, readonly, decommissioned, decommissioning, + corrupt, excess, stale); } /** @@ -3444,13 +3458,13 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas repl = countNodes(block); int curExpectedReplicas = getReplication(block); if (isNeededReplication(block, repl.liveReplicas())) { - neededReplications.update(block, repl.liveReplicas(), repl - .decommissionedAndDecommissioning(), curExpectedReplicas, + neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(), + repl.decommissionedAndDecommissioning(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { int oldReplicas = repl.liveReplicas()-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - neededReplications.remove(block, oldReplicas, + neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), oldExpectedReplicas); } } finally { @@ -3471,6 +3485,7 @@ public class BlockManager implements BlockStatsMXBean { final int pending = pendingReplications.getNumReplicas(block); if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { neededReplications.add(block, n.liveReplicas() + pending, + n.readOnlyReplicas(), n.decommissionedAndDecommissioning(), expected); } else if (n.liveReplicas() > expected) { processOverReplicatedBlock(block, expected, null, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index be1c2b9edb8..28024b1d40e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -573,7 +573,7 @@ public class DecommissionManager { blockManager.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReplications.add(block, - curReplicas, + liveReplicas, num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), block.getReplication()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java index e567bbf3a2f..44ae6f6a462 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; */ public class NumberReplicas { private int liveReplicas; + private int readOnlyReplicas; // Tracks only the decommissioning replicas private int decommissioning; @@ -33,17 +34,18 @@ public class NumberReplicas { private int replicasOnStaleNodes; NumberReplicas() { - initialize(0, 0, 0, 0, 0, 0); + this(0, 0, 0, 0, 0, 0, 0); } - NumberReplicas(int live, int decommissioned, int decommissioning, int corrupt, - int excess, int stale) { - initialize(live, decommissioned, decommissioning, corrupt, excess, stale); + NumberReplicas(int live, int readonly, int decommissioned, + int decommissioning, int corrupt, int excess, int stale) { + set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale); } - void initialize(int live, int decommissioned, int decommissioning, - int corrupt, int excess, int stale) { + void set(int live, int readonly, int decommissioned, int decommissioning, + int corrupt, int excess, int stale) { liveReplicas = live; + readOnlyReplicas = readonly; this.decommissioning = decommissioning; this.decommissioned = decommissioned; corruptReplicas = corrupt; @@ -55,6 +57,10 @@ public class NumberReplicas { return liveReplicas; } + public int readOnlyReplicas() { + return readOnlyReplicas; + } + /** * * @return decommissioned replicas + decommissioning replicas diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 0fce69c5937..bc2fe7ad0fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; + import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; /** * Keep prioritized queues of under replicated blocks. @@ -34,7 +36,11 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * *

* The policy for choosing which priority to give added blocks +<<<<<<< HEAD * is implemented in {@link #getPriority(int, int, int)}. +======= + * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}. +>>>>>>> 5411dc5... HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo) *

*

The queue order is as follows:

*
    @@ -146,6 +152,7 @@ class UnderReplicatedBlocks implements Iterable { * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) */ private int getPriority(int curReplicas, + int readOnlyReplicas, int decommissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; @@ -158,6 +165,11 @@ class UnderReplicatedBlocks implements Iterable { if (decommissionedReplicas > 0) { return QUEUE_HIGHEST_PRIORITY; } + if (readOnlyReplicas > 0) { + // only has read-only replicas, highest risk + // since the read-only replicas may go down all together. + return QUEUE_HIGHEST_PRIORITY; + } //all we have are corrupt blocks return QUEUE_WITH_CORRUPT_BLOCKS; } else if (curReplicas == 1) { @@ -183,11 +195,12 @@ class UnderReplicatedBlocks implements Iterable { */ synchronized boolean add(BlockInfo block, int curReplicas, + int readOnlyReplicas, int decomissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; - int priLevel = getPriority(curReplicas, decomissionedReplicas, - expectedReplicas); + final int priLevel = getPriority(curReplicas, readOnlyReplicas, + decomissionedReplicas, expectedReplicas); if(priorityQueues.get(priLevel).add(block)) { if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && expectedReplicas == 1) { @@ -207,11 +220,11 @@ class UnderReplicatedBlocks implements Iterable { /** remove a block from a under replication queue */ synchronized boolean remove(BlockInfo block, int oldReplicas, + int oldReadOnlyReplicas, int decommissionedReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(oldReplicas, - decommissionedReplicas, - oldExpectedReplicas); + final int priLevel = getPriority(oldReplicas, oldReadOnlyReplicas, + decommissionedReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && oldExpectedReplicas == 1 && @@ -250,10 +263,10 @@ class UnderReplicatedBlocks implements Iterable { // Try to remove the block from all queues if the block was // not found in the queue for the given priority level. for (int i = 0; i < LEVEL; i++) { - if (priorityQueues.get(i).remove(block)) { + if (i != priLevel && priorityQueues.get(i).remove(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" + - " {} from priority queue {}", block, priLevel); + " {} from priority queue {}", block, i); return true; } } @@ -278,15 +291,15 @@ class UnderReplicatedBlocks implements Iterable { * @param expectedReplicasDelta the change in the expected replica count from before */ synchronized void update(BlockInfo block, int curReplicas, - int decommissionedReplicas, + int readOnlyReplicas, int decommissionedReplicas, int curExpectedReplicas, int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(curReplicas, decommissionedReplicas, - curExpectedReplicas); - int oldPri = getPriority(oldReplicas, decommissionedReplicas, - oldExpectedReplicas); + int curPri = getPriority(curReplicas, readOnlyReplicas, + decommissionedReplicas, curExpectedReplicas); + int oldPri = getPriority(oldReplicas, readOnlyReplicas, + decommissionedReplicas, oldExpectedReplicas); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + block + @@ -336,143 +349,69 @@ class UnderReplicatedBlocks implements Iterable { * @return Return a list of block lists to be replicated. The block list index * represents its replication priority. */ - public synchronized List> chooseUnderReplicatedBlocks( + synchronized List> chooseUnderReplicatedBlocks( int blocksToProcess) { - // initialize data structure for the return value - List> blocksToReplicate = new ArrayList<>(LEVEL); - for (int i = 0; i < LEVEL; i++) { - blocksToReplicate.add(new ArrayList()); - } - - if (size() == 0) { // There are no blocks to collect. - return blocksToReplicate; - } + final List> blocksToReplicate = new ArrayList<>(LEVEL); - int blockCount = 0; - for (int priority = 0; priority < LEVEL; priority++) { - // Go through all blocks that need replications with current priority. - BlockIterator neededReplicationsIterator = iterator(priority); - // Set the iterator to the first unprocessed block at this priority level. - neededReplicationsIterator.setToBookmark(); + int count = 0; + int priority = 0; + for (; count < blocksToProcess && priority < LEVEL; priority++) { + if (priority == QUEUE_WITH_CORRUPT_BLOCKS) { + // do not choose corrupted blocks. + continue; + } - blocksToProcess = Math.min(blocksToProcess, size()); - - if (blockCount == blocksToProcess) { - break; // break if already expected blocks are obtained - } - + // Go through all blocks that need replications with current priority. + // Set the iterator to the first unprocessed block at this priority level. + final Iterator i = priorityQueues.get(priority).getBookmark(); + final List blocks = new LinkedList<>(); + blocksToReplicate.add(blocks); // Loop through all remaining blocks in the list. - while (blockCount < blocksToProcess - && neededReplicationsIterator.hasNext()) { - BlockInfo block = neededReplicationsIterator.next(); - blocksToReplicate.get(priority).add(block); - blockCount++; - } - - if (!neededReplicationsIterator.hasNext() - && neededReplicationsIterator.getPriority() == LEVEL - 1) { - // Reset all priorities' bookmarks to the beginning because there were - // no recently added blocks in any list. - for (int i = 0; i < LEVEL; i++) { - this.priorityQueues.get(i).resetBookmark(); - } - break; + for(; count < blocksToProcess && i.hasNext(); count++) { + blocks.add(i.next()); } } + + if (priority == LEVEL) { + // Reset all bookmarks because there were no recently added blocks. + for (LightWeightLinkedSet q : priorityQueues) { + q.resetBookmark(); + } + } + return blocksToReplicate; } /** returns an iterator of all blocks in a given priority queue */ - synchronized BlockIterator iterator(int level) { - return new BlockIterator(level); + synchronized Iterator iterator(int level) { + return priorityQueues.get(level).iterator(); } /** return an iterator of all the under replication blocks */ @Override - public synchronized BlockIterator iterator() { - return new BlockIterator(); - } + public synchronized Iterator iterator() { + final Iterator> q = priorityQueues.iterator(); + return new Iterator() { + private Iterator b = q.next().iterator(); - /** - * An iterator over blocks. - */ - class BlockIterator implements Iterator { - private int level; - private boolean isIteratorForLevel = false; - private final List> iterators = new ArrayList<>(); - - /** - * Construct an iterator over all queues. - */ - private BlockIterator() { - level=0; - for(int i=0; i> chosenBlocks, int firstPrioritySize, - int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize, - int fifthPrioritySize) { - assertEquals( - "Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks", - firstPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size()); - assertEquals( - "Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks", - secondPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size()); - assertEquals( - "Not returned the expected number of QUEUE_UNDER_REPLICATED blocks", - thirdPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size()); - assertEquals( - "Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks", - fourthPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size()); - assertEquals( - "Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks", - fifthPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size()); + List> chosenBlocks, int... expectedSizes) { + int i = 0; + for(; i < chosenBlocks.size(); i++) { + assertEquals("Not returned the expected number for i=" + i, + expectedSizes[i], chosenBlocks.get(i).size()); + } + for(; i < expectedSizes.length; i++) { + assertEquals("Expected size is non-zero for i=" + i, 0, expectedSizes[i]); + } } /** @@ -1101,14 +1086,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding QUEUE_VERY_UNDER_REPLICATED block final int block1CurReplicas = 2; final int block1ExpectedReplicas = 7; - underReplicatedBlocks.add(block1, block1CurReplicas, 0, + underReplicatedBlocks.add(block1, block1CurReplicas, 0, 0, block1ExpectedReplicas); // Adding QUEUE_VERY_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 2, 0, 7); + underReplicatedBlocks.add(block2, 2, 0, 0, 7); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block3, 2, 0, 6); + underReplicatedBlocks.add(block3, 2, 0, 0, 6); List> chosenBlocks; @@ -1119,7 +1104,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Increasing the replications will move the block down a // priority. This simulates a replica being completed in between checks. - underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, + underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, 0, block1ExpectedReplicas, 1, 0); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block @@ -1147,10 +1132,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block1, 0, 1, 1); + underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 0, 1, 1); + underReplicatedBlocks.add(block2, 0, 0, 1, 1); List> chosenBlocks; @@ -1196,10 +1181,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block1, 0, 1, 1); + underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 0, 1, 1); + underReplicatedBlocks.add(block2, 0, 0, 1, 1); List> chosenBlocks; @@ -1258,10 +1243,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block1, 0, 1, 1); + underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 0, 1, 1); + underReplicatedBlocks.add(block2, 0, 0, 1, 1); List> chosenBlocks; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java index de36e077aea..2efc90d1e9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Iterator; + import org.apache.hadoop.hdfs.protocol.Block; import org.junit.Test; @@ -53,7 +55,7 @@ public class TestUnderReplicatedBlockQueues { assertEquals(1, queues.size()); assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY); //repeated additions fail - assertFalse(queues.add(block1, 1, 0, 3)); + assertFalse(queues.add(block1, 1, 0, 0, 3)); //add a second block with two replicas assertAdded(queues, block2, 2, 0, 3); @@ -77,11 +79,11 @@ public class TestUnderReplicatedBlockQueues { assertAdded(queues, block_corrupt_repl_one, 0, 0, 1); assertEquals(2, queues.getCorruptBlockSize()); assertEquals(1, queues.getCorruptReplOneBlockSize()); - queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2); + queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2); assertEquals(0, queues.getCorruptReplOneBlockSize()); - queues.update(block_corrupt, 0, 0, 1, 0, -2); + queues.update(block_corrupt, 0, 0, 0, 1, 0, -2); assertEquals(1, queues.getCorruptReplOneBlockSize()); - queues.update(block_very_under_replicated, 0, 0, 1, -4, -24); + queues.update(block_very_under_replicated, 0, 0, 0, 1, -4, -24); assertEquals(2, queues.getCorruptReplOneBlockSize()); } @@ -92,7 +94,7 @@ public class TestUnderReplicatedBlockQueues { int expectedReplicas) { assertTrue("Failed to add " + block, queues.add(block, - curReplicas, + curReplicas, 0, decomissionedReplicas, expectedReplicas)); } @@ -110,7 +112,7 @@ public class TestUnderReplicatedBlockQueues { private void assertInLevel(UnderReplicatedBlocks queues, Block block, int level) { - UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level); + final Iterator bi = queues.iterator(level); while (bi.hasNext()) { Block next = bi.next(); if (block.equals(next)) {