diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 38b4904aa29..e652298c32b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -605,6 +605,9 @@ Release 2.3.0 - UNRELEASED HDFS-5873. dfs.http.policy should have higher precedence over dfs.https.enable. (Haohui Mai via jing9) + HDFS-5837. dfs.namenode.replication.considerLoad should consider + decommissioned nodes. (Tao Luo via shv) + BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS HDFS-4985. Add storage type to the protocol and expose it in block report diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index f4dc208d731..8b740cd94c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -633,9 +633,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // check the communication traffic of the target machine if (considerLoad) { double avgLoad = 0; - int size = clusterMap.getNumOfLeaves(); - if (size != 0 && stats != null) { - avgLoad = (double)stats.getTotalLoad()/size; + if (stats != null) { + int size = stats.getNumDatanodesInService(); + if (size != 0) { + avgLoad = (double)stats.getTotalLoad()/size; + } } if (node.getXceiverCount() > (2.0 * avgLoad)) { logNodeIsNotChosen(storage, "the node is too busy "); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java index f4827f38c8a..676aa0826c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java @@ -42,6 +42,12 @@ public interface FSClusterStats { * for writing targets, and false otherwise. */ public boolean isAvoidingStaleDataNodesForWrite(); + + /** + * Indicates number of datanodes that are in service. + * @return Number of datanodes that are both alive and not decommissioned. + */ + public int getNumDatanodesInService(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index eb71c115c9b..1fa7057b1e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -6801,7 +6801,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return this.blockManager.getDatanodeManager() .shouldAvoidStaleDataNodesForWrite(); } - + + @Override // FSClusterStats + public int getNumDatanodesInService() { + return getNumLiveDataNodes() - getNumDecomLiveDataNodes(); + } + public SnapshotManager getSnapshotManager() { return snapshotManager; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java new file mode 100644 index 00000000000..ab88e93ca61 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.test.PathUtils; +import org.apache.hadoop.util.VersionInfo; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestReplicationPolicyConsiderLoad { + + private static NameNode namenode; + private static DatanodeManager dnManager; + private static List dnrList; + private static DatanodeDescriptor[] dataNodes; + private static DatanodeStorageInfo[] storages; + + @BeforeClass + public static void setupCluster() throws IOException { + Configuration conf = new HdfsConfiguration(); + final String[] racks = { + "/rack1", + "/rack1", + "/rack1", + "/rack2", + "/rack2", + "/rack2"}; + storages = DFSTestUtil.createDatanodeStorageInfos(racks); + dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); + FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + new File(baseDir, "name").getPath()); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); + DFSTestUtil.formatNameNode(conf); + namenode = new NameNode(conf); + int blockSize = 1024; + + dnrList = new ArrayList(); + dnManager = namenode.getNamesystem().getBlockManager().getDatanodeManager(); + + // Register DNs + for (int i=0; i < 6; i++) { + DatanodeRegistration dnr = new DatanodeRegistration(dataNodes[i], + new StorageInfo(), new ExportedBlockKeys(), VersionInfo.getVersion()); + dnrList.add(dnr); + dnManager.registerDatanode(dnr); + dataNodes[i].getStorageInfos()[0].setUtilizationForTesting( + 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L, + 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L); + dataNodes[i].updateHeartbeat( + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]), + 0L, 0L, 0, 0); + } + } + + /** + * Tests that chooseTarget with considerLoad set to true correctly calculates + * load with decommissioned nodes. + */ + @Test + public void testChooseTargetWithDecomNodes() throws IOException { + namenode.getNamesystem().writeLock(); + try { + // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget() + // returns false + for (int i = 0; i < 3; i++) { + DatanodeInfo d = dnManager.getDatanodeByXferAddr( + dnrList.get(i).getIpAddr(), + dnrList.get(i).getXferPort()); + d.setDecommissioned(); + } + String blockPoolId = namenode.getNamesystem().getBlockPoolId(); + dnManager.handleHeartbeat(dnrList.get(3), + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]), + blockPoolId, dataNodes[3].getCacheCapacity(), + dataNodes[3].getCacheRemaining(), + 2, 0, 0); + dnManager.handleHeartbeat(dnrList.get(4), + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]), + blockPoolId, dataNodes[4].getCacheCapacity(), + dataNodes[4].getCacheRemaining(), + 4, 0, 0); + dnManager.handleHeartbeat(dnrList.get(5), + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]), + blockPoolId, dataNodes[5].getCacheCapacity(), + dataNodes[5].getCacheRemaining(), + 4, 0, 0); + + // Call chooseTarget() + DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() + .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, + dataNodes[0], new ArrayList(), false, null, + 1024, StorageType.DEFAULT); + + assertEquals(3, targets.length); + Set targetSet = new HashSet( + Arrays.asList(targets)); + for (int i = 3; i < storages.length; i++) { + assertTrue(targetSet.contains(storages[i])); + } + } finally { + dataNodes[0].stopDecommission(); + dataNodes[1].stopDecommission(); + dataNodes[2].stopDecommission(); + namenode.getNamesystem().writeUnlock(); + } + } + + @AfterClass + public static void teardownCluster() { + if (namenode != null) namenode.stop(); + } + +}