diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e9c11b968ad..4e450e2d5fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1999,6 +1999,7 @@ public class BlockManager implements BlockStatsMXBean { (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); } + @VisibleForTesting BlockReconstructionWork scheduleReconstruction(BlockInfo block, int priority) { // skip abandoned block or block reopened for append @@ -2043,7 +2044,9 @@ public class BlockManager implements BlockStatsMXBean { additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas() - pendingNum; } else { - additionalReplRequired = 1; // Needed on a new rack + // Violates placement policy. Needed on a new rack or domain etc. + BlockPlacementStatus placementStatus = getBlockPlacementStatus(block); + additionalReplRequired = placementStatus.getAdditionalReplicasRequired(); } final BlockCollection bc = getBlockCollection(block); @@ -2076,20 +2079,6 @@ public class BlockManager implements BlockStatsMXBean { } } - private boolean isInNewRack(DatanodeDescriptor[] srcs, - DatanodeDescriptor target) { - LOG.debug("check if target {} increases racks, srcs={}", target, - Arrays.asList(srcs)); - for (DatanodeDescriptor src : srcs) { - if (!src.isDecommissionInProgress() && - src.getNetworkLocation().equals(target.getNetworkLocation())) { - LOG.debug("the target {} is in the same rack with src {}", target, src); - return false; - } - } - return true; - } - private boolean validateReconstructionWork(BlockReconstructionWork rw) { BlockInfo block = rw.getBlock(); int priority = rw.getPriority(); @@ -2115,10 +2104,16 @@ public class BlockManager implements BlockStatsMXBean { } DatanodeStorageInfo[] targets = rw.getTargets(); + BlockPlacementStatus placementStatus = getBlockPlacementStatus(block); if ((numReplicas.liveReplicas() >= requiredRedundancy) && - (!isPlacementPolicySatisfied(block)) ) { - if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) { - // No use continuing, unless a new rack in this case + (!placementStatus.isPlacementPolicySatisfied())) { + BlockPlacementStatus newPlacementStatus = + getBlockPlacementStatus(block, targets); + if (!newPlacementStatus.isPlacementPolicySatisfied() && + (newPlacementStatus.getAdditionalReplicasRequired() >= + placementStatus.getAdditionalReplicasRequired())) { + // If the new targets do not meet the placement policy, or at least + // reduce the number of replicas needed, then no use continuing. return false; } // mark that the reconstruction work is to replicate internal block to a @@ -4512,7 +4507,25 @@ public class BlockManager implements BlockStatsMXBean { } boolean isPlacementPolicySatisfied(BlockInfo storedBlock) { + return getBlockPlacementStatus(storedBlock, null) + .isPlacementPolicySatisfied(); + } + + BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock) { + return getBlockPlacementStatus(storedBlock, null); + } + + BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock, + DatanodeStorageInfo[] additionalStorage) { List liveNodes = new ArrayList<>(); + if (additionalStorage != null) { + // additionalNodes, are potential new targets for the block. If there are + // any passed, include them when checking the placement policy to see if + // the policy is met, when it may not have been met without these nodes. + for (DatanodeStorageInfo s : additionalStorage) { + liveNodes.add(getDatanodeDescriptorFromStorage(s)); + } + } Collection corruptNodes = corruptReplicas .getNodes(storedBlock); for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { @@ -4520,7 +4533,22 @@ public class BlockManager implements BlockStatsMXBean { && storage.getState() == State.NORMAL) { // assume the policy is satisfied for blocks on PROVIDED storage // as long as the storage is in normal state. - return true; + return new BlockPlacementStatus() { + @Override + public boolean isPlacementPolicySatisfied() { + return true; + } + + @Override + public String getErrorDescription() { + return null; + } + + @Override + public int getAdditionalReplicasRequired() { + return 0; + } + }; } final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage); // Nodes under maintenance should be counted as valid replicas from @@ -4536,8 +4564,7 @@ public class BlockManager implements BlockStatsMXBean { .getPolicy(blockType); int numReplicas = blockType == STRIPED ? ((BlockInfoStriped) storedBlock) .getRealTotalBlockNum() : storedBlock.getReplication(); - return placementPolicy.verifyBlockPlacement(locs, numReplicas) - .isPlacementPolicySatisfied(); + return placementPolicy.verifyBlockPlacement(locs, numReplicas); } boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java index e2ac54a3537..a227666b871 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java @@ -39,4 +39,12 @@ public interface BlockPlacementStatus { */ public String getErrorDescription(); + /** + * Return the number of additional replicas needed to ensure the block + * placement policy is satisfied. + * @return The number of new replicas needed to satisify the placement policy + * or zero if no extra are needed + */ + int getAdditionalReplicasRequired(); + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java index 75bb65d9014..761214234c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java @@ -45,4 +45,12 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus { " more rack(s). Total number of racks in the cluster: " + totalRacks; } + @Override + public int getAdditionalReplicasRequired() { + if (isPlacementPolicySatisfied()) { + return 0; + } else { + return requiredRacks - currentRacks; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java index b98b3dac362..ac5a5b51d9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java @@ -78,4 +78,15 @@ public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus { } return errorDescription.toString(); } + + @Override + public int getAdditionalReplicasRequired() { + if (isPlacementPolicySatisfied()) { + return 0; + } else { + int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired(); + int child = requiredNodeGroups - currentNodeGroups.size(); + return Math.max(parent, child); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java index 4b3c3cc3830..b839cede2bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java @@ -85,4 +85,24 @@ public class BlockPlacementStatusWithUpgradeDomain implements } return errorDescription.toString(); } -} \ No newline at end of file + + @Override + public int getAdditionalReplicasRequired() { + if (isPlacementPolicySatisfied()) { + return 0; + } else { + // It is possible for a block to have the correct number of upgrade + // domains, but only a single rack, or be on multiple racks, but only in + // one upgrade domain. + int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired(); + int child; + + if (numberOfReplicas <= upgradeDomainFactor) { + child = numberOfReplicas - upgradeDomains.size(); + } else { + child = upgradeDomainFactor - upgradeDomains.size(); + } + return Math.max(parent, child); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 37996ae18e4..3d0d8828467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -526,17 +526,24 @@ public class DFSTestUtil { } } + public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, + int racks, int replicas, int neededReplicas) + throws TimeoutException, InterruptedException { + waitForReplication(cluster, b, racks, replicas, neededReplicas, 0); + } + /* * Wait up to 20s for the given block to be replicated across * the requested number of racks, with the requested number of * replicas, and the requested number of replicas still needed. */ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, - int racks, int replicas, int neededReplicas) + int racks, int replicas, int neededReplicas, int neededDomains) throws TimeoutException, InterruptedException { int curRacks = 0; int curReplicas = 0; int curNeededReplicas = 0; + int curDomains = 0; int count = 0; final int ATTEMPTS = 20; @@ -547,17 +554,21 @@ public class DFSTestUtil { curRacks = r[0]; curReplicas = r[1]; curNeededReplicas = r[2]; + curDomains = r[3]; count++; } while ((curRacks != racks || curReplicas != replicas || - curNeededReplicas != neededReplicas) && count < ATTEMPTS); + curNeededReplicas != neededReplicas || + (neededDomains != 0 && curDomains != neededDomains)) + && count < ATTEMPTS); if (count == ATTEMPTS) { throw new TimeoutException("Timed out waiting for replication." + " Needed replicas = "+neededReplicas + " Cur needed replicas = "+curNeededReplicas + " Replicas = "+replicas+" Cur replicas = "+curReplicas - + " Racks = "+racks+" Cur racks = "+curRacks); + + " Racks = "+racks+" Cur racks = "+curRacks + + " Domains = "+neededDomains+" Cur domains = "+curDomains); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index dfb40a6c07b..fff909f3f44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -81,7 +81,8 @@ public class BlockManagerTestUtil { /** * @return a tuple of the replica state (number racks, number live - * replicas, and number needed replicas) for the given block. + * replicas, number needed replicas and number of UpgradeDomains) for the + * given block. */ public static int[] getReplicaInfo(final FSNamesystem namesystem, final Block b) { final BlockManager bm = namesystem.getBlockManager(); @@ -90,7 +91,8 @@ public class BlockManagerTestUtil { final BlockInfo storedBlock = bm.getStoredBlock(b); return new int[]{getNumberOfRacks(bm, b), bm.countNodes(storedBlock).liveReplicas(), - bm.neededReconstruction.contains(storedBlock) ? 1 : 0}; + bm.neededReconstruction.contains(storedBlock) ? 1 : 0, + getNumberOfDomains(bm, b)}; } finally { namesystem.readUnlock(); } @@ -120,6 +122,30 @@ public class BlockManagerTestUtil { return rackSet.size(); } + /** + * @return the number of UpgradeDomains over which a given block is replicated + * decommissioning/decommissioned nodes are not counted. corrupt replicas + * are also ignored. + */ + private static int getNumberOfDomains(final BlockManager blockManager, + final Block b) { + final Set domSet = new HashSet(0); + final Collection corruptNodes = + getCorruptReplicas(blockManager).getNodes(b); + for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) { + final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); + if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { + if ((corruptNodes == null) || !corruptNodes.contains(cur)) { + String domain = cur.getUpgradeDomain(); + if (domain != null && !domSet.contains(domain)) { + domSet.add(domain); + } + } + } + } + return domSet.size(); + } + /** * @return redundancy monitor thread instance from block manager. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java new file mode 100644 index 00000000000..6b0733452ca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import org.junit.Test; + +/** + * Unit tests to validate the BlockPlacementStatusDefault policy, focusing on + * the getAdditionAlReplicasRequired method. + */ +public class TestBlockPlacementStatusDefault { + + @Test + public void testIsPolicySatisfiedCorrectly() { + // 2 current racks and 2 expected + BlockPlacementStatusDefault bps = + new BlockPlacementStatusDefault(2, 2, 5); + assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); + + // 1 current rack and 2 expected + bps = + new BlockPlacementStatusDefault(1, 2, 5); + assertFalse(bps.isPlacementPolicySatisfied()); + assertEquals(1, bps.getAdditionalReplicasRequired()); + + // 3 current racks and 2 expected + bps = + new BlockPlacementStatusDefault(3, 2, 5); + assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); + + // 1 current rack and 2 expected, but only 1 rack on the cluster + bps = + new BlockPlacementStatusDefault(1, 2, 1); + assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java index bfff9328a66..1e0fb76d92a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,11 +50,13 @@ public class TestBlockPlacementStatusWithUpgradeDomain { @Test public void testIsPolicySatisfiedParentFalse() { when(bpsd.isPlacementPolicySatisfied()).thenReturn(false); + when(bpsd.getAdditionalReplicasRequired()).thenReturn(1); BlockPlacementStatusWithUpgradeDomain bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3); // Parent policy is not satisfied but upgrade domain policy is assertFalse(bps.isPlacementPolicySatisfied()); + assertEquals(1, bps.getAdditionalReplicasRequired()); } @Test @@ -63,21 +66,73 @@ public class TestBlockPlacementStatusWithUpgradeDomain { // Number of domains, replicas and upgradeDomainFactor is equal and parent // policy is satisfied assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); } @Test - public void testIsPolicySatisifedSmallDomains() { + public void testIsPolicySatisfiedSmallDomains() { // Number of domains is less than replicas but equal to factor BlockPlacementStatusWithUpgradeDomain bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 3); assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); // Same as above but replicas is greater than factor bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 2); assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); // Number of domains is less than replicas and factor bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 4); assertFalse(bps.isPlacementPolicySatisfied()); + assertEquals(1, bps.getAdditionalReplicasRequired()); } -} \ No newline at end of file + + @Test + public void testIsPolicySatisfiedSmallReplicas() { + // Replication factor 1 file + upgradeDomains.clear(); + upgradeDomains.add("1"); + BlockPlacementStatusWithUpgradeDomain bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 1, 3); + assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); + + // Replication factor 2 file, but one domain + bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3); + assertFalse(bps.isPlacementPolicySatisfied()); + assertEquals(1, bps.getAdditionalReplicasRequired()); + + // Replication factor 2 file, but two domains + upgradeDomains.add("2"); + bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3); + assertTrue(bps.isPlacementPolicySatisfied()); + assertEquals(0, bps.getAdditionalReplicasRequired()); + } + + @Test + public void testPolicyIsNotSatisfiedInsufficientDomains() { + // Insufficient Domains - 1 domain, replication factor 3 + upgradeDomains.clear(); + upgradeDomains.add("1"); + BlockPlacementStatusWithUpgradeDomain bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3); + assertFalse(bps.isPlacementPolicySatisfied()); + assertEquals(2, bps.getAdditionalReplicasRequired()); + + // One domain, replication factor 2 file + bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3); + assertFalse(bps.isPlacementPolicySatisfied()); + assertEquals(1, bps.getAdditionalReplicasRequired()); + + // 2 domains, replication factor 3 + upgradeDomains.add("2"); + bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3); + assertFalse(bps.isPlacementPolicySatisfied()); + assertEquals(1, bps.getAdditionalReplicasRequired()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java index 2bf6045b6f4..dda5fef191d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java @@ -21,7 +21,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; +import java.io.IOException; import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,6 +45,8 @@ import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; import org.slf4j.event.Level; +import static org.junit.Assert.*; + public class TestBlocksWithNotEnoughRacks { public static final Log LOG = LogFactory.getLog(TestBlocksWithNotEnoughRacks.class); static { @@ -472,4 +476,105 @@ public class TestBlocksWithNotEnoughRacks { hostsFileWriter.cleanup(); } } + + @Test + public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception { + Configuration conf = getConf(); + final short replicationFactor = 3; + final Path filePath = new Path("/testFile"); + + conf.set("dfs.block.replicator.classname", + "org.apache.hadoop.hdfs.server.blockmanagement." + + "BlockPlacementPolicyWithUpgradeDomain"); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(6).build(); + cluster.waitClusterUp(); + + List dnDescriptors = getDnDescriptors(cluster); + + try { + // Create a file with one block with a replication factor of 3 + // No upgrade domains are set. + final FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L); + ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath); + + BlockManager bm = cluster.getNamesystem().getBlockManager(); + BlockInfo storedBlock = bm.getStoredBlock(b.getLocalBlock()); + + // The block should be replicated OK - so Reconstruction Work will be null + BlockReconstructionWork work = bm.scheduleReconstruction(storedBlock, 2); + assertNull(work); + // Set the upgradeDomain to "3" for the 3 nodes hosting the block. + // Then alternately set the remaining 3 nodes to have an upgradeDomain + // of 0 or 1 giving a total of 3 upgradeDomains. + for (int i=0; i dnDescriptors = getDnDescriptors(cluster); + for (int i=0; i < dnDescriptors.size(); i++) { + dnDescriptors.get(i).setUpgradeDomain(Integer.toString(i%3)); + } + try { + final FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L); + ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath); + fs.setReplication(filePath, replicationFactor); + DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0, 3); + } finally { + cluster.shutdown(); + } + } + + private List getDnDescriptors(MiniDFSCluster cluster) + throws IOException { + List dnDesc = new ArrayList<>(); + DatanodeManager dnManager = cluster.getNamesystem().getBlockManager() + .getDatanodeManager(); + for (DataNode dn : cluster.getDataNodes()) { + DatanodeDescriptor d = dnManager.getDatanode(dn.getDatanodeUuid()); + if (d == null) { + throw new IOException("DatanodeDescriptor not found for DN "+ + dn.getDatanodeUuid()); + } + dnDesc.add(d); + } + return dnDesc; + } }