diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 3ea232258aa..9d2bb786836 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -435,7 +435,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @param storageTypes storage type to be considered for target * @return local node of writer (not chosen node) */ - private Node chooseTarget(int numOfReplicas, + private Node chooseTarget(final int numOfReplicas, Node writer, final Set excludedNodes, final long blocksize, @@ -469,7 +469,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { LOG.trace("storageTypes={}", storageTypes); try { - if ((numOfReplicas = requiredStorageTypes.size()) == 0) { + if (requiredStorageTypes.size() == 0) { throw new NotEnoughReplicasException( "All required storage types are unavailable: " + " unavailableStorages=" + unavailableStorages @@ -498,10 +498,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { for (DatanodeStorageInfo resultStorage : results) { addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes); } - // Set numOfReplicas, since it can get out of sync with the result list + // Set newNumOfReplicas, since it can get out of sync with the result list // if the NotEnoughReplicasException was thrown in chooseRandom(). - numOfReplicas = totalReplicasExpected - results.size(); - return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, + int newNumOfReplicas = totalReplicasExpected - results.size(); + return chooseTarget(newNumOfReplicas, writer, oldExcludedNodes, blocksize, maxNodesPerRack, results, false, storagePolicy, unavailableStorages, newBlock, null); } @@ -520,8 +520,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes); } - numOfReplicas = totalReplicasExpected - results.size(); - return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, + int newNumOfReplicas = totalReplicasExpected - results.size(); + return chooseTarget(newNumOfReplicas, writer, oldExcludedNodes, blocksize, maxNodesPerRack, results, false, storagePolicy, unavailableStorages, newBlock, null); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index 54c3eda4b86..e7212062a8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -1337,6 +1337,59 @@ public class TestBlockStoragePolicy { Assert.assertEquals(StorageType.DISK, targets[1].getStorageType()); } + /** + * Consider a File with All_SSD storage policy. + * 1. Choose 3 DISK DNs for pipeline because SSD DNs no enough at + * the beginning. + * 2. One of DISK DNs fails And it need choose one new DN for existing. + * pipeline {@link DataStreamer addDatanode2ExistingPipeline()}. + * Make sure the number of target DNs are 3. + * see HDFS-16182. + */ + @Test + public void testAddDatanode2ExistingPipelineInSsd() throws Exception { + BlockStoragePolicy policy = POLICY_SUITE.getPolicy(ALLSSD); + + final String[] racks = {"/d1/r1", "/d2/r2", "/d3/r3", "/d4/r4", "/d5/r5", + "/d6/r6", "/d7/r7"}; + final String[] hosts = {"host1", "host2", "host3", "host4", "host5", + "host6", "host7"}; + final StorageType[] disks = {StorageType.DISK, StorageType.DISK, StorageType.DISK}; + + final DatanodeStorageInfo[] diskStorages + = DFSTestUtil.createDatanodeStorageInfos(7, racks, hosts, disks); + final DatanodeDescriptor[] dataNodes + = DFSTestUtil.toDatanodeDescriptor(diskStorages); + for (int i = 0; i < dataNodes.length; i++) { + BlockManagerTestUtil.updateStorage(dataNodes[i], + new DatanodeStorage("ssd" + i + 1, DatanodeStorage.State.NORMAL, + StorageType.SSD)); + } + + FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + new File(baseDir, "name").getPath()); + DFSTestUtil.formatNameNode(conf); + NameNode namenode = new NameNode(conf); + + final BlockManager bm = namenode.getNamesystem().getBlockManager(); + BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy(); + NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology(); + for (DatanodeDescriptor datanode : dataNodes) { + cluster.add(datanode); + } + // chsenDs are DISK StorageType to simulate not enough SDD Storage + List chsenDs = new ArrayList<>(); + chsenDs.add(diskStorages[0]); + chsenDs.add(diskStorages[1]); + DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 1, + null, chsenDs, true, + new HashSet(), 0, policy, null); + Assert.assertEquals(3, targets.length); + } + @Test public void testGetFileStoragePolicyAfterRestartNN() throws Exception { //HDFS8219