From f8fc96a66ea3cbd41a3915c4546ff816451cf9db Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Fri, 20 Jan 2017 21:37:51 +0530 Subject: [PATCH] HDFS-11309. [SPS]: chooseTargetTypeInSameNode should pass accurate block size to chooseStorage4Block while choosing target. Contributed by Uma Maheswara Rao G --- .../namenode/StoragePolicySatisfier.java | 31 ++--- .../namenode/TestStoragePolicySatisfier.java | 108 +++++++++++++++--- 2 files changed, 110 insertions(+), 29 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 3b19833c363..1c48910ed7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -403,24 +403,25 @@ public class StoragePolicySatisfier implements Runnable { List sourceStorageTypes = new ArrayList<>(); List targetNodes = new ArrayList<>(); List targetStorageTypes = new ArrayList<>(); - List chosenNodes = new ArrayList<>(); + List excludeNodes = new ArrayList<>(); // Looping over all the source node locations and choose the target // storage within same node if possible. This is done separately to // avoid choosing a target which already has this block. for (int i = 0; i < sourceWithStorageList.size(); i++) { StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); - StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode( + StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo, existingTypeNodePair.dn, expected); if (chosenTarget != null) { sourceNodes.add(existingTypeNodePair.dn); sourceStorageTypes.add(existingTypeNodePair.storageType); targetNodes.add(chosenTarget.dn); targetStorageTypes.add(chosenTarget.storageType); - chosenNodes.add(chosenTarget.dn); expected.remove(chosenTarget.storageType); // TODO: We can increment scheduled block count for this node? } + // To avoid choosing this excludeNodes as targets later + excludeNodes.add(existingTypeNodePair.dn); } // Looping over all the source node locations. Choose a remote target @@ -437,28 +438,28 @@ public class StoragePolicySatisfier implements Runnable { .getNetworkTopology().isNodeGroupAware()) { chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes, - chosenNodes); + excludeNodes); } // Then, match nodes on the same rack if (chosenTarget == null) { chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, expected, - Matcher.SAME_RACK, locsForExpectedStorageTypes, chosenNodes); + Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes); } if (chosenTarget == null) { chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, expected, - Matcher.ANY_OTHER, locsForExpectedStorageTypes, chosenNodes); + Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes); } if (null != chosenTarget) { sourceNodes.add(existingTypeNodePair.dn); sourceStorageTypes.add(existingTypeNodePair.storageType); targetNodes.add(chosenTarget.dn); targetStorageTypes.add(chosenTarget.storageType); - chosenNodes.add(chosenTarget.dn); expected.remove(chosenTarget.storageType); + excludeNodes.add(chosenTarget.dn); // TODO: We can increment scheduled block count for this node? } else { LOG.warn( @@ -554,14 +555,18 @@ public class StoragePolicySatisfier implements Runnable { /** * Choose the target storage within same datanode if possible. * - * @param source source datanode - * @param targetTypes list of target storage types + * @param block + * - block info + * @param source + * - source datanode + * @param targetTypes + * - list of target storage types */ - private StorageTypeNodePair chooseTargetTypeInSameNode( + private StorageTypeNodePair chooseTargetTypeInSameNode(Block block, DatanodeDescriptor source, List targetTypes) { for (StorageType t : targetTypes) { DatanodeStorageInfo chooseStorage4Block = - source.chooseStorage4Block(t, 0); + source.chooseStorage4Block(t, block.getNumBytes()); if (chooseStorage4Block != null) { return new StorageTypeNodePair(t, source); } @@ -572,7 +577,7 @@ public class StoragePolicySatisfier implements Runnable { private StorageTypeNodePair chooseTarget(Block block, DatanodeDescriptor source, List targetTypes, Matcher matcher, StorageTypeNodeMap locsForExpectedStorageTypes, - List chosenNodes) { + List excludeNodes) { for (StorageType t : targetTypes) { List nodesWithStorages = locsForExpectedStorageTypes.getNodesWithStorages(t); @@ -581,7 +586,7 @@ public class StoragePolicySatisfier implements Runnable { } Collections.shuffle(nodesWithStorages); for (DatanodeDescriptor target : nodesWithStorages) { - if (!chosenNodes.contains(target) && matcher.match( + if (!excludeNodes.contains(target) && matcher.match( blockManager.getDatanodeManager().getNetworkTopology(), source, target)) { if (null != target.chooseStorage4Block(t, block.getNumBytes())) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 1c53894908c..de73e8b7c24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; +import static org.junit.Assert.assertNull; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; @@ -57,6 +60,8 @@ import com.google.common.base.Supplier; * moved and finding its suggested target locations to move. */ public class TestStoragePolicySatisfier { + private static final String ONE_SSD = "ONE_SSD"; + private static final String COLD = "COLD"; private static final Logger LOG = LoggerFactory.getLogger(TestStoragePolicySatisfier.class); private final Configuration config = new HdfsConfiguration(); @@ -93,7 +98,7 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(file), COLD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -151,7 +156,7 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), "ONE_SSD"); + dfs.setStoragePolicy(new Path(file), ONE_SSD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -184,7 +189,7 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), "ONE_SSD"); + dfs.setStoragePolicy(new Path(file), ONE_SSD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -232,7 +237,7 @@ public class TestStoragePolicySatisfier { List blockCollectionIds = new ArrayList<>(); // Change policy to ONE_SSD for (String fileName : files) { - dfs.setStoragePolicy(new Path(fileName), "ONE_SSD"); + dfs.setStoragePolicy(new Path(fileName), ONE_SSD); INode inode = namesystem.getFSDirectory().getINode(fileName); blockCollectionIds.add(inode.getId()); } @@ -274,12 +279,12 @@ public class TestStoragePolicySatisfier { HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(file), COLD); StorageType[][] newtypes = - new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}}; + new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}; startAdditionalDNs(config, 3, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); @@ -314,7 +319,7 @@ public class TestStoragePolicySatisfier { writeContent(subFile2); // Change policy to COLD - dfs.setStoragePolicy(new Path(subDir), "ONE_SSD"); + dfs.setStoragePolicy(new Path(subDir), ONE_SSD); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; @@ -418,7 +423,7 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(file), COLD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -463,7 +468,7 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(file), COLD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -533,7 +538,7 @@ public class TestStoragePolicySatisfier { final String file1 = createFileAndSimulateFavoredNodes(2); // Change policy to COLD - dfs.setStoragePolicy(new Path(file1), "COLD"); + dfs.setStoragePolicy(new Path(file1), COLD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file1); @@ -594,7 +599,7 @@ public class TestStoragePolicySatisfier { writeContent(file, (short) 5); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(file), COLD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -633,7 +638,7 @@ public class TestStoragePolicySatisfier { writeContent(file); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), "ONE_SSD"); + dfs.setStoragePolicy(new Path(file), ONE_SSD); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -688,6 +693,77 @@ public class TestStoragePolicySatisfier { } } + /** + * Tests that movements should not be assigned when there is no space in + * target DN. + */ + @Test(timeout = 300000) + public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace() + throws Exception { + StorageType[][] diskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", 2 * DEFAULT_BLOCK_SIZE); + long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1); + try { + hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, + storagesPerDatanode, dnCapacity); + dfs = hdfsCluster.getFileSystem(); + writeContent(file); + + // Change policy to ONE_SSD + dfs.setStoragePolicy(new Path(file), ONE_SSD); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + Path filePath = new Path("/testChooseInSameDatanode"); + final FSDataOutputStream out = + dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE); + try { + dfs.setStoragePolicy(filePath, ONE_SSD); + // Try to fill up SSD part by writing content + long remaining = dfs.getStatus().getRemaining() / (3 * 2); + for (int i = 0; i < remaining; i++) { + out.write(i); + } + } finally { + out.close(); + } + hdfsCluster.triggerHeartbeats(); + ArrayList dataNodes = hdfsCluster.getDataNodes(); + // Temporarily disable heart beats, so that we can assert whether any + // items schedules for DNs even though DN's does not have space to write. + // Disabling heart beats can keep scheduled items on DatanodeDescriptor + // itself. + for (DataNode dataNode : dataNodes) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true); + } + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + + // Wait for items to be processed + waitForAttemptedItems(1, 30000); + + // Make sure no items assigned for movements + Set dns = hdfsCluster.getNamesystem() + .getBlockManager().getDatanodeManager().getDatanodes(); + for (DatanodeDescriptor dd : dns) { + assertNull(dd.getBlocksToMoveStorages()); + } + + // Enable heart beats now + for (DataNode dataNode : dataNodes) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, false); + } + hdfsCluster.triggerHeartbeats(); + + DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs); + } finally { + shutdownCluster(); + } + } + private String createFileAndSimulateFavoredNodes(int favoredNodesCount) throws IOException { ArrayList dns = hdfsCluster.getDataNodes(); @@ -769,8 +845,8 @@ public class TestStoragePolicySatisfier { // write to DISK final FSDataOutputStream out = dfs.create(new Path(fileName), replicatonFactor); - for (int i = 0; i < 1000; i++) { - out.writeChars("t"); + for (int i = 0; i < 1024; i++) { + out.write(i); } out.close(); }