From 4d01dbda508691beb07a4c8bfe113ec568166ddc Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 2 Oct 2015 11:41:23 -0700 Subject: [PATCH] HDFS-9015. Refactor TestReplicationPolicy to test different block placement policies. (Ming Ma via lei) (cherry picked from commit 260b9d9410e45dbcb89d97d58450c79220c9e7bc) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../BaseReplicationPolicyTest.java | 160 ++++++++++++++++++ .../TestReplicationPolicy.java | 148 ++++------------ .../TestReplicationPolicyConsiderLoad.java | 121 ++++--------- .../TestReplicationPolicyWithNodeGroup.java | 159 +++-------------- 5 files changed, 256 insertions(+), 335 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 05ce9496883..5eb92472f73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -662,6 +662,9 @@ Release 2.8.0 - UNRELEASED 'CredentialBasedAccessTokenProvider.getCredential()' abstract methods to public (Santhosh Nayak via cnauroth) + HDFS-9015. Refactor TestReplicationPolicy to test different block placement + policies. (Ming Ma via lei) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java new file mode 100644 index 00000000000..c541da33308 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.TestBlockStoragePolicy; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; + +abstract public class BaseReplicationPolicyTest { + { + GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL); + } + + protected NetworkTopology cluster; + protected DatanodeDescriptor dataNodes[]; + protected static final int BLOCK_SIZE = 1024; + protected NameNode namenode; + protected DatanodeManager dnManager; + protected BlockPlacementPolicy replicator; + protected final String filename = "/dummyfile.txt"; + protected DatanodeStorageInfo[] storages; + protected String blockPlacementPolicy; + protected NamenodeProtocols nameNodeRpc = null; + + static void updateHeartbeatWithUsage(DatanodeDescriptor dn, + long capacity, long dfsUsed, long remaining, long blockPoolUsed, + long dnCacheCapacity, long dnCacheUsed, int xceiverCount, + int volFailures) { + dn.getStorageInfos()[0].setUtilizationForTesting( + capacity, dfsUsed, remaining, blockPoolUsed); + dn.updateHeartbeat( + BlockManagerTestUtil.getStorageReportsForDatanode(dn), + dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + } + + abstract DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf); + + @Before + public void setupCluster() throws Exception { + Configuration conf = new HdfsConfiguration(); + dataNodes = getDatanodeDescriptors(conf); + + FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + new File(baseDir, "name").getPath()); + conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + blockPlacementPolicy); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); + DFSTestUtil.formatNameNode(conf); + namenode = new NameNode(conf); + nameNodeRpc = namenode.getRpcServer(); + + final BlockManager bm = namenode.getNamesystem().getBlockManager(); + replicator = bm.getBlockPlacementPolicy(); + cluster = bm.getDatanodeManager().getNetworkTopology(); + dnManager = bm.getDatanodeManager(); + // construct network topology + for (int i=0; i < dataNodes.length; i++) { + cluster.add(dataNodes[i]); + //bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]); + bm.getDatanodeManager().getHeartbeatManager().addDatanode( + dataNodes[i]); + } + updateHeartbeatWithUsage(); + } + + void updateHeartbeatWithUsage() { + for (int i=0; i < dataNodes.length; i++) { + updateHeartbeatWithUsage(dataNodes[i], + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); + } + } + + @After + public void tearDown() throws Exception { + namenode.stop(); + } + + boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) { + return isOnSameRack(left, right.getDatanodeDescriptor()); + } + + boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) { + return cluster.isOnSameRack(left.getDatanodeDescriptor(), right); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas) { + return chooseTarget(numOfReplicas, dataNodes[0]); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer) { + return chooseTarget(numOfReplicas, writer, + new ArrayList()); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + List chosenNodes) { + return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, List chosenNodes) { + return chooseTarget(numOfReplicas, writer, chosenNodes, null); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + List chosenNodes, Set excludedNodes) { + return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, + excludedNodes); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, List chosenNodes, + Set excludedNodes) { + return replicator.chooseTarget(filename, numOfReplicas, writer, + chosenNodes, false, excludedNodes, BLOCK_SIZE, + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 90bc1b0ffdc..2ddb1ec0977 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -26,7 +26,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -39,7 +38,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -55,52 +53,40 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -public class TestReplicationPolicy { - { - GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL); - } +@RunWith(Parameterized.class) +public class TestReplicationPolicy extends BaseReplicationPolicyTest { - private static final int BLOCK_SIZE = 1024; - private static final int NUM_OF_DATANODES = 6; - private static NetworkTopology cluster; - private static NameNode namenode; - private static BlockPlacementPolicy replicator; private static final String filename = "/dummyfile.txt"; - private static DatanodeDescriptor[] dataNodes; - private static DatanodeStorageInfo[] storages; // The interval for marking a datanode as stale, private static final long staleInterval = DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT; @Rule public ExpectedException exception = ExpectedException.none(); - - private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) { - dn.getStorageInfos()[0].setUtilizationForTesting( - capacity, dfsUsed, remaining, blockPoolUsed); - dn.updateHeartbeat( - BlockManagerTestUtil.getStorageReportsForDatanode(dn), - dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + + public TestReplicationPolicy(String blockPlacementPolicyClassName) { + this.blockPlacementPolicy = blockPlacementPolicyClassName; } - private static void updateHeartbeatForExtraStorage(long capacity, + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][] { + { BlockPlacementPolicyDefault.class.getName() } }); + } + + private void updateHeartbeatForExtraStorage(long capacity, long dfsUsed, long remaining, long blockPoolUsed) { DatanodeDescriptor dn = dataNodes[5]; dn.getStorageInfos()[1].setUtilizationForTesting( @@ -110,9 +96,19 @@ public class TestReplicationPolicy { 0L, 0L, 0, 0, null); } - @BeforeClass - public static void setupCluster() throws Exception { - Configuration conf = new HdfsConfiguration(); + private void resetHeartbeatForStorages() { + for (int i=0; i < dataNodes.length; i++) { + updateHeartbeatWithUsage(dataNodes[i], + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, + 0, 0); + } + // No available space in the extra storage of dn0 + updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L); + } + + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { final String[] racks = { "/d1/r1", "/d1/r1", @@ -121,59 +117,13 @@ public class TestReplicationPolicy { "/d2/r3", "/d2/r3"}; storages = DFSTestUtil.createDatanodeStorageInfos(racks); - dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); - // create an extra storage for dn5. DatanodeStorage extraStorage = new DatanodeStorage( storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL, StorageType.DEFAULT); -/* DatanodeStorageInfo si = new DatanodeStorageInfo( - storages[5].getDatanodeDescriptor(), extraStorage); -*/ BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(), extraStorage); - - FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); - conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, - new File(baseDir, "name").getPath()); - - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); - DFSTestUtil.formatNameNode(conf); - namenode = new NameNode(conf); - - final BlockManager bm = namenode.getNamesystem().getBlockManager(); - replicator = bm.getBlockPlacementPolicy(); - cluster = bm.getDatanodeManager().getNetworkTopology(); - // construct network topology - for (int i=0; i < NUM_OF_DATANODES; i++) { - cluster.add(dataNodes[i]); - bm.getDatanodeManager().getHeartbeatManager().addDatanode( - dataNodes[i]); - } - resetHeartbeatForStorages(); - } - - private static void resetHeartbeatForStorages() { - for (int i=0; i < NUM_OF_DATANODES; i++) { - updateHeartbeatWithUsage(dataNodes[i], - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); - } - // No available space in the extra storage of dn0 - updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L); - } - - private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) { - return isOnSameRack(left, right.getDatanodeDescriptor()); - } - - private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) { - return cluster.isOnSameRack(left.getDatanodeDescriptor(), right); + return DFSTestUtil.toDatanodeDescriptor(storages); } /** @@ -269,40 +219,6 @@ public class TestReplicationPolicy { resetHeartbeatForStorages(); } - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) { - return chooseTarget(numOfReplicas, dataNodes[0]); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer) { - return chooseTarget(numOfReplicas, writer, - new ArrayList()); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - List chosenNodes) { - return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer, List chosenNodes) { - return chooseTarget(numOfReplicas, writer, chosenNodes, null); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - List chosenNodes, Set excludedNodes) { - return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes); - } - - private static DatanodeStorageInfo[] chooseTarget( - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - Set excludedNodes) { - return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, - false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); - } - /** * In this testcase, client is dataNodes[0], but the dataNodes[1] is * not allowed to be chosen. So the 1st replica should be @@ -555,7 +471,7 @@ public class TestReplicationPolicy { throws Exception { try { namenode.getNamesystem().getBlockManager().getDatanodeManager() - .setNumStaleNodes(NUM_OF_DATANODES); + .setNumStaleNodes(dataNodes.length); testChooseTargetWithMoreThanAvailableNodes(); } finally { namenode.getNamesystem().getBlockManager().getDatanodeManager() @@ -583,8 +499,8 @@ public class TestReplicationPolicy { // try to choose NUM_OF_DATANODES which is more than actually available // nodes. - DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES); - assertEquals(targets.length, NUM_OF_DATANODES - 2); + DatanodeStorageInfo[] targets = chooseTarget(dataNodes.length); + assertEquals(targets.length, dataNodes.length - 2); final List log = appender.getLog(); assertNotNull(log); @@ -1256,7 +1172,7 @@ public class TestReplicationPolicy { // Adding this block will increase its current replication, and that will // remove it from the queue. bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info, - ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]); + ReplicaState.FINALIZED), storages[0]); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index 8486f7f5a13..f4bd709547c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -20,85 +20,45 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; -import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.test.PathUtils; -import org.apache.hadoop.util.VersionInfo; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -public class TestReplicationPolicyConsiderLoad { +@RunWith(Parameterized.class) +public class TestReplicationPolicyConsiderLoad + extends BaseReplicationPolicyTest { - private static NameNode namenode; - private static DatanodeManager dnManager; - private static List dnrList; - private static DatanodeDescriptor[] dataNodes; - private static DatanodeStorageInfo[] storages; + public TestReplicationPolicyConsiderLoad(String blockPlacementPolicy) { + this.blockPlacementPolicy = blockPlacementPolicy; + } - @BeforeClass - public static void setupCluster() throws IOException { - Configuration conf = new HdfsConfiguration(); + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][] { + { BlockPlacementPolicyDefault.class.getName() } }); + } + + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { final String[] racks = { - "/rack1", "/rack1", "/rack1", "/rack2", "/rack2", - "/rack2"}; + "/rack3", + "/rack3"}; storages = DFSTestUtil.createDatanodeStorageInfos(racks); - dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); - FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); - conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, - new File(baseDir, "name").getPath()); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); - DFSTestUtil.formatNameNode(conf); - namenode = new NameNode(conf); - int blockSize = 1024; - - dnrList = new ArrayList(); - dnManager = namenode.getNamesystem().getBlockManager().getDatanodeManager(); - - // Register DNs - for (int i=0; i < 6; i++) { - DatanodeRegistration dnr = new DatanodeRegistration(dataNodes[i], - new StorageInfo(NodeType.DATA_NODE), new ExportedBlockKeys(), - VersionInfo.getVersion()); - dnrList.add(dnr); - dnManager.registerDatanode(dnr); - dataNodes[i].getStorageInfos()[0].setUtilizationForTesting( - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L, - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L); - dataNodes[i].updateHeartbeat( - BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]), - 0L, 0L, 0, 0, null); - } + return DFSTestUtil.toDatanodeDescriptor(storages); } private final double EPSILON = 0.0001; @@ -110,46 +70,39 @@ public class TestReplicationPolicyConsiderLoad { public void testChooseTargetWithDecomNodes() throws IOException { namenode.getNamesystem().writeLock(); try { - String blockPoolId = namenode.getNamesystem().getBlockPoolId(); - dnManager.handleHeartbeat(dnrList.get(3), + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[3], BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]), - blockPoolId, dataNodes[3].getCacheCapacity(), - dataNodes[3].getCacheRemaining(), - 2, 0, 0, null); - dnManager.handleHeartbeat(dnrList.get(4), + dataNodes[3].getCacheCapacity(), + dataNodes[3].getCacheUsed(), + 2, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[4], BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]), - blockPoolId, dataNodes[4].getCacheCapacity(), - dataNodes[4].getCacheRemaining(), - 4, 0, 0, null); - dnManager.handleHeartbeat(dnrList.get(5), + dataNodes[4].getCacheCapacity(), + dataNodes[4].getCacheUsed(), + 4, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[5], BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]), - blockPoolId, dataNodes[5].getCacheCapacity(), - dataNodes[5].getCacheRemaining(), - 4, 0, 0, null); + dataNodes[5].getCacheCapacity(), + dataNodes[5].getCacheUsed(), + 4, 0, null); + // value in the above heartbeats final int load = 2 + 4 + 4; - FSNamesystem fsn = namenode.getNamesystem(); assertEquals((double)load/6, dnManager.getFSClusterStats() .getInServiceXceiverAverage(), EPSILON); // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget() // returns false for (int i = 0; i < 3; i++) { - DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i)); + DatanodeDescriptor d = dataNodes[i]; dnManager.getDecomManager().startDecommission(d); d.setDecommissioned(); } assertEquals((double)load/3, dnManager.getFSClusterStats() .getInServiceXceiverAverage(), EPSILON); - // update references of writer DN to update the de-commissioned state - List liveNodes = new ArrayList(); - dnManager.fetchDatanodes(liveNodes, null, false); - DatanodeDescriptor writerDn = null; - if (liveNodes.contains(dataNodes[0])) { - writerDn = liveNodes.get(liveNodes.indexOf(dataNodes[0])); - } + DatanodeDescriptor writerDn = dataNodes[0]; // Call chooseTarget() DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() @@ -170,10 +123,4 @@ public class TestReplicationPolicyConsiderLoad { namenode.getNamesystem().writeUnlock(); } } - - @AfterClass - public static void teardownCluster() { - if (namenode != null) namenode.stop(); - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index e9739253fd2..85598ca1de6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -32,38 +31,25 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; import org.apache.hadoop.net.Node; -import org.apache.hadoop.test.PathUtils; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -public class TestReplicationPolicyWithNodeGroup { - private static final int BLOCK_SIZE = 1024; - private static final int NUM_OF_DATANODES = 8; - private static final int NUM_OF_DATANODES_BOUNDARY = 6; - private static final int NUM_OF_DATANODES_MORE_TARGETS = 12; - private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6; - private final Configuration CONF = new HdfsConfiguration(); - private NetworkTopology cluster; - private NameNode namenode; - private BlockPlacementPolicy replicator; - private static final String filename = "/dummyfile.txt"; +public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTest { + public TestReplicationPolicyWithNodeGroup() { + this.blockPlacementPolicy = BlockPlacementPolicyWithNodeGroup.class.getName(); + } - private static final DatanodeStorageInfo[] storages; - private static final DatanodeDescriptor[] dataNodes; - static { + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { + conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, + NetworkTopologyWithNodeGroup.class.getName()); final String[] racks = { "/d1/r1/n1", "/d1/r1/n1", @@ -75,7 +61,7 @@ public class TestReplicationPolicyWithNodeGroup { "/d2/r3/n6" }; storages = DFSTestUtil.createDatanodeStorageInfos(racks); - dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); + return DFSTestUtil.toDatanodeDescriptor(storages); } private static final DatanodeStorageInfo[] storagesInBoundaryCase; @@ -142,60 +128,7 @@ public class TestReplicationPolicyWithNodeGroup { dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies); }; - - @Before - public void setUp() throws Exception { - FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); - CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - // Set properties to make HDFS aware of NodeGroup. - CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, - BlockPlacementPolicyWithNodeGroup.class.getName()); - CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, - NetworkTopologyWithNodeGroup.class.getName()); - - CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); - - File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class); - - CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, - new File(baseDir, "name").getPath()); - - DFSTestUtil.formatNameNode(CONF); - namenode = new NameNode(CONF); - final BlockManager bm = namenode.getNamesystem().getBlockManager(); - replicator = bm.getBlockPlacementPolicy(); - cluster = bm.getDatanodeManager().getNetworkTopology(); - // construct network topology - for(int i=0; i()); - } - - private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - List chosenNodes) { - return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); - } - - private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer, List chosenNodes) { - return chooseTarget(numOfReplicas, writer, chosenNodes, null); - } - - private DatanodeStorageInfo[] chooseTarget( - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - Set excludedNodes) { - return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, - false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); - } - /** * In this testcase, client is dataNodes[0]. So the 1st replica should be * placed on dataNodes[0], the 2nd replica should be placed on @@ -467,7 +367,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testChooseTarget5() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); DatanodeStorageInfo[] targets; targets = chooseTarget(0, NODE); assertEquals(targets.length, 0); @@ -514,7 +414,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testRereplicate1() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); List chosenNodes = new ArrayList(); chosenNodes.add(storages[0]); DatanodeStorageInfo[] targets; @@ -547,7 +447,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testRereplicate2() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); List chosenNodes = new ArrayList(); chosenNodes.add(storages[0]); chosenNodes.add(storages[1]); @@ -575,7 +475,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testRereplicate3() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); List chosenNodes = new ArrayList(); chosenNodes.add(storages[0]); chosenNodes.add(storages[3]); @@ -671,19 +571,14 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testChooseTargetsOnBoundaryTopology() throws Exception { - for(int i=0; i