diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2265e1de399..34ddb600dfe 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -247,6 +247,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2690. Web-page for FifoScheduler. (Eric Payne via acmurthy) + MAPREDUCE-2711. Update TestBlockPlacementPolicyRaid for the new namesystem + and block management APIs. (szetszwo) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java b/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java index 07331ef8d84..dd9d012493c 100644 --- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java +++ b/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,475 +41,479 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType; -import org.apache.hadoop.hdfs.server.namenode.*; +import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRaidTestUtil; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRaidUtil; +import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.raid.RaidNode; +import org.junit.Assert; import org.junit.Test; public class TestBlockPlacementPolicyRaid { - @Test - public void testFoo() { + private Configuration conf = null; + private MiniDFSCluster cluster = null; + private FSNamesystem namesystem = null; + private BlockManager blockManager; + private NetworkTopology networktopology; + private BlockPlacementPolicyRaid policy = null; + private FileSystem fs = null; + String[] rack1 = {"/rack1"}; + String[] rack2 = {"/rack2"}; + String[] host1 = {"host1.rack1.com"}; + String[] host2 = {"host2.rack2.com"}; + String xorPrefix = null; + String raidTempPrefix = null; + String raidrsTempPrefix = null; + String raidrsHarTempPrefix = null; + + final static Log LOG = + LogFactory.getLog(TestBlockPlacementPolicyRaid.class); + + protected void setupCluster() throws IOException { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); + conf.set("dfs.replication.pending.timeout.sec", "2"); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L); + conf.set("dfs.block.replicator.classname", + BlockPlacementPolicyRaid.class.getName()); + conf.set(RaidNode.STRIPE_LENGTH_KEY, "2"); + conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3"); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); + // start the cluster with one datanode first + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1). + format(true).racks(rack1).hosts(host1).build(); + cluster.waitActive(); + namesystem = cluster.getNameNode().getNamesystem(); + blockManager = namesystem.getBlockManager(); + networktopology = blockManager.getDatanodeManager().getNetworkTopology(); + + Assert.assertTrue("BlockPlacementPolicy type is not correct.", + blockManager.getBlockPlacementPolicy() instanceof BlockPlacementPolicyRaid); + policy = (BlockPlacementPolicyRaid)blockManager.getBlockPlacementPolicy(); + fs = cluster.getFileSystem(); + xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); + raidTempPrefix = RaidNode.xorTempPrefix(conf); + raidrsTempPrefix = RaidNode.rsTempPrefix(conf); + raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf); + } + + /** + * Test that the parity files will be placed at the good locations when we + * create them. + */ + @Test + public void testChooseTargetForRaidFile() throws IOException { + setupCluster(); + try { + String src = "/dir/file"; + String parity = raidrsTempPrefix + src; + DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L); + DFSTestUtil.waitReplication(fs, new Path(src), (short)1); + refreshPolicy(); + setBlockPlacementPolicy(namesystem, policy); + // start 3 more datanodes + String[] racks = {"/rack2", "/rack2", "/rack2", + "/rack2", "/rack2", "/rack2"}; + String[] hosts = + {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", + "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; + cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); + int numBlocks = 6; + DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L); + DFSTestUtil.waitReplication(fs, new Path(parity), (short)2); + FileStatus srcStat = fs.getFileStatus(new Path(src)); + BlockLocation[] srcLoc = + fs.getFileBlockLocations(srcStat, 0, srcStat.getLen()); + FileStatus parityStat = fs.getFileStatus(new Path(parity)); + BlockLocation[] parityLoc = + fs.getFileBlockLocations(parityStat, 0, parityStat.getLen()); + int parityLen = RaidNode.rsParityLength(conf); + for (int i = 0; i < numBlocks / parityLen; i++) { + Set locations = new HashSet(); + for (int j = 0; j < srcLoc.length; j++) { + String [] names = srcLoc[j].getNames(); + for (int k = 0; k < names.length; k++) { + LOG.info("Source block location: " + names[k]); + locations.add(names[k]); + } + } + for (int j = 0 ; j < parityLen; j++) { + String[] names = parityLoc[j + i * parityLen].getNames(); + for (int k = 0; k < names.length; k++) { + LOG.info("Parity block location: " + names[k]); + Assert.assertTrue(locations.add(names[k])); + } + } + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test that the har parity files will be placed at the good locations when we + * create them. + */ + @Test + public void testChooseTargetForHarRaidFile() throws IOException { + setupCluster(); + try { + String[] racks = {"/rack2", "/rack2", "/rack2", + "/rack2", "/rack2", "/rack2"}; + String[] hosts = + {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", + "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; + cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); + String harParity = raidrsHarTempPrefix + "/dir/file"; + int numBlocks = 11; + DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L); + DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1); + FileStatus stat = fs.getFileStatus(new Path(harParity)); + BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen()); + int rsParityLength = RaidNode.rsParityLength(conf); + for (int i = 0; i < numBlocks - rsParityLength; i++) { + Set locations = new HashSet(); + for (int j = 0; j < rsParityLength; j++) { + for (int k = 0; k < loc[i + j].getNames().length; k++) { + // verify that every adjacent 4 blocks are on differnt nodes + String name = loc[i + j].getNames()[k]; + LOG.info("Har Raid block location: " + name); + Assert.assertTrue(locations.add(name)); + } + } + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test BlockPlacementPolicyRaid.CachedLocatedBlocks + * Verify that the results obtained from cache is the same as + * the results obtained directly + */ + @Test + public void testCachedBlocks() throws IOException { + setupCluster(); + try { + String file1 = "/dir/file1"; + String file2 = "/dir/file2"; + DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); + // test blocks cache + CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem); + verifyCachedBlocksResult(cachedBlocks, namesystem, file1); + verifyCachedBlocksResult(cachedBlocks, namesystem, file1); + verifyCachedBlocksResult(cachedBlocks, namesystem, file2); + verifyCachedBlocksResult(cachedBlocks, namesystem, file2); + try { + Thread.sleep(1200L); + } catch (InterruptedException e) { + } + verifyCachedBlocksResult(cachedBlocks, namesystem, file2); + verifyCachedBlocksResult(cachedBlocks, namesystem, file1); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test BlockPlacementPolicyRaid.CachedFullPathNames + * Verify that the results obtained from cache is the same as + * the results obtained directly + */ + @Test + public void testCachedPathNames() throws IOException { + setupCluster(); + try { + String file1 = "/dir/file1"; + String file2 = "/dir/file2"; + DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); + // test full path cache + CachedFullPathNames cachedFullPathNames = + new CachedFullPathNames(namesystem); + final FSInodeInfo[] inodes = NameNodeRaidTestUtil.getFSInodeInfo( + namesystem, file1, file2); + + verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]); + verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]); + verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]); + verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]); + try { + Thread.sleep(1200L); + } catch (InterruptedException e) { + } + verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]); + verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** + * Test the result of getCompanionBlocks() on the unraided files + */ + @Test + public void testGetCompanionBLocks() throws IOException { + setupCluster(); + try { + String file1 = "/dir/file1"; + String file2 = "/raid/dir/file2"; + String file3 = "/raidrs/dir/file3"; + // Set the policy to default policy to place the block in the default way + setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( + conf, namesystem, networktopology)); + DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L); + Collection companionBlocks; + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock()); + Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock()); + Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock()); + Assert.assertEquals(1, companionBlocks.size()); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock()); + Assert.assertEquals(1, companionBlocks.size()); + + int rsParityLength = RaidNode.rsParityLength(conf); + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock()); + Assert.assertEquals(rsParityLength, companionBlocks.size()); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock()); + Assert.assertEquals(rsParityLength, companionBlocks.size()); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock()); + Assert.assertEquals(2, companionBlocks.size()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + static void setBlockPlacementPolicy( + FSNamesystem namesystem, BlockPlacementPolicy policy) { + namesystem.writeLock(); + try { + namesystem.getBlockManager().setBlockPlacementPolicy(policy); + } finally { + namesystem.writeUnlock(); + } + } + + /** + * Test BlockPlacementPolicyRaid actually deletes the correct replica. + * Start 2 datanodes and create 1 source file and its parity file. + * 1) Start host1, create the parity file with replication 1 + * 2) Start host2, create the source file with replication 2 + * 3) Set repliation of source file to 1 + * Verify that the policy should delete the block with more companion blocks. + */ + @Test + public void testDeleteReplica() throws IOException { + setupCluster(); + try { + // Set the policy to default policy to place the block in the default way + setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( + conf, namesystem, networktopology)); + DatanodeDescriptor datanode1 = blockManager.getDatanodeManager( + ).getDatanodeCyclicIteration("").iterator().next().getValue(); + String source = "/dir/file"; + String parity = xorPrefix + source; + + final Path parityPath = new Path(parity); + DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L); + DFSTestUtil.waitReplication(fs, parityPath, (short)1); + + // start one more datanode + cluster.startDataNodes(conf, 1, true, null, rack2, host2, null); + DatanodeDescriptor datanode2 = null; + for(Map.Entry e : blockManager.getDatanodeManager( + ).getDatanodeCyclicIteration("")) { + final DatanodeDescriptor d = e.getValue(); + if (!d.getName().equals(datanode1.getName())) { + datanode2 = d; + } + } + Assert.assertTrue(datanode2 != null); + cluster.waitActive(); + final Path sourcePath = new Path(source); + DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L); + DFSTestUtil.waitReplication(fs, sourcePath, (short)2); + + refreshPolicy(); + Assert.assertEquals(parity, + policy.getParityFile(source)); + Assert.assertEquals(source, + policy.getSourceFile(parity, xorPrefix)); + + List sourceBlocks = getBlocks(namesystem, source); + List parityBlocks = getBlocks(namesystem, parity); + Assert.assertEquals(5, sourceBlocks.size()); + Assert.assertEquals(3, parityBlocks.size()); + + // verify the result of getCompanionBlocks() + Collection companionBlocks; + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(0).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{0, 1}, new int[]{0}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(1).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{0, 1}, new int[]{0}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(2).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{2, 3}, new int[]{1}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(3).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{2, 3}, new int[]{1}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(4).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{4}, new int[]{2}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(0).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{0, 1}, new int[]{0}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(1).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{2, 3}, new int[]{1}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(2).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{4}, new int[]{2}); + + // Set the policy back to raid policy. We have to create a new object + // here to clear the block location cache + refreshPolicy(); + setBlockPlacementPolicy(namesystem, policy); + // verify policy deletes the correct blocks. companion blocks should be + // evenly distributed. + fs.setReplication(sourcePath, (short)1); + DFSTestUtil.waitReplication(fs, sourcePath, (short)1); + Map counters = new HashMap(); + refreshPolicy(); + for (int i = 0; i < parityBlocks.size(); i++) { + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(i).getBlock()); + + counters = BlockPlacementPolicyRaid.countCompanionBlocks( + companionBlocks, false); + Assert.assertTrue(counters.get(datanode1.getName()) >= 1 && + counters.get(datanode1.getName()) <= 2); + Assert.assertTrue(counters.get(datanode1.getName()) + + counters.get(datanode2.getName()) == + companionBlocks.size()); + + counters = BlockPlacementPolicyRaid.countCompanionBlocks( + companionBlocks, true); + Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 && + counters.get(datanode1.getParent().getName()) <= 2); + Assert.assertTrue(counters.get(datanode1.getParent().getName()) + + counters.get(datanode2.getParent().getName()) == + companionBlocks.size()); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + // create a new BlockPlacementPolicyRaid to clear the cache + private void refreshPolicy() { + policy = new BlockPlacementPolicyRaid(); + policy.initialize(conf, namesystem, networktopology); + } + + private void verifyCompanionBlocks(Collection companionBlocks, + List sourceBlocks, List parityBlocks, + int[] sourceBlockIndexes, int[] parityBlockIndexes) { + Set blockSet = new HashSet(); + for (LocatedBlock b : companionBlocks) { + blockSet.add(b.getBlock()); + } + Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length, + blockSet.size()); + for (int index : sourceBlockIndexes) { + Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock())); + } + for (int index : parityBlockIndexes) { + Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock())); + } + } + + private void verifyCachedFullPathNameResult( + CachedFullPathNames cachedFullPathNames, FSInodeInfo inode) + throws IOException { + String res1 = inode.getFullPathName(); + String res2 = cachedFullPathNames.get(inode); + LOG.info("Actual path name: " + res1); + LOG.info("Cached path name: " + res2); + Assert.assertEquals(cachedFullPathNames.get(inode), + inode.getFullPathName()); + } + + private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks, + FSNamesystem namesystem, String file) throws IOException{ + long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); + List res1 = NameNodeRaidUtil.getBlockLocations(namesystem, + file, 0L, len, false, false).getLocatedBlocks(); + List res2 = cachedBlocks.get(file); + for (int i = 0; i < res1.size(); i++) { + LOG.info("Actual block: " + res1.get(i).getBlock()); + LOG.info("Cached block: " + res2.get(i).getBlock()); + Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock()); + } + } + + private Collection getCompanionBlocks( + FSNamesystem namesystem, BlockPlacementPolicyRaid policy, + ExtendedBlock block) throws IOException { + INodeFile inode = blockManager.blocksMap.getINode(block + .getLocalBlock()); + FileType type = policy.getFileType(inode.getFullPathName()); + return policy.getCompanionBlocks(inode.getFullPathName(), type, + block.getLocalBlock()); + } + + private List getBlocks(FSNamesystem namesystem, String file) + throws IOException { + long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); + return NameNodeRaidUtil.getBlockLocations(namesystem, + file, 0, len, false, false).getLocatedBlocks(); } -// private Configuration conf = null; -// private MiniDFSCluster cluster = null; -// private FSNamesystem namesystem = null; -// private BlockPlacementPolicyRaid policy = null; -// private FileSystem fs = null; -// String[] rack1 = {"/rack1"}; -// String[] rack2 = {"/rack2"}; -// String[] host1 = {"host1.rack1.com"}; -// String[] host2 = {"host2.rack2.com"}; -// String xorPrefix = null; -// String raidTempPrefix = null; -// String raidrsTempPrefix = null; -// String raidrsHarTempPrefix = null; -// -// final static Log LOG = -// LogFactory.getLog(TestBlockPlacementPolicyRaid.class); -// -// protected void setupCluster() throws IOException { -// conf = new Configuration(); -// conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); -// conf.set("dfs.replication.pending.timeout.sec", "2"); -// conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L); -// conf.set("dfs.block.replicator.classname", -// "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid"); -// conf.set(RaidNode.STRIPE_LENGTH_KEY, "2"); -// conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3"); -// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); -// // start the cluster with one datanode first -// cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1). -// format(true).racks(rack1).hosts(host1).build(); -// cluster.waitActive(); -// namesystem = cluster.getNameNode().getNamesystem(); -// Assert.assertTrue("BlockPlacementPolicy type is not correct.", -// namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid); -// policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator; -// fs = cluster.getFileSystem(); -// xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); -// raidTempPrefix = RaidNode.xorTempPrefix(conf); -// raidrsTempPrefix = RaidNode.rsTempPrefix(conf); -// raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf); -// } -// -// /** -// * Test that the parity files will be placed at the good locations when we -// * create them. -// */ -// @Test -// public void testChooseTargetForRaidFile() throws IOException { -// setupCluster(); -// try { -// String src = "/dir/file"; -// String parity = raidrsTempPrefix + src; -// DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L); -// DFSTestUtil.waitReplication(fs, new Path(src), (short)1); -// refreshPolicy(); -// setBlockPlacementPolicy(namesystem, policy); -// // start 3 more datanodes -// String[] racks = {"/rack2", "/rack2", "/rack2", -// "/rack2", "/rack2", "/rack2"}; -// String[] hosts = -// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", -// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; -// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); -// int numBlocks = 6; -// DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L); -// DFSTestUtil.waitReplication(fs, new Path(parity), (short)2); -// FileStatus srcStat = fs.getFileStatus(new Path(src)); -// BlockLocation[] srcLoc = -// fs.getFileBlockLocations(srcStat, 0, srcStat.getLen()); -// FileStatus parityStat = fs.getFileStatus(new Path(parity)); -// BlockLocation[] parityLoc = -// fs.getFileBlockLocations(parityStat, 0, parityStat.getLen()); -// int parityLen = RaidNode.rsParityLength(conf); -// for (int i = 0; i < numBlocks / parityLen; i++) { -// Set locations = new HashSet(); -// for (int j = 0; j < srcLoc.length; j++) { -// String [] names = srcLoc[j].getNames(); -// for (int k = 0; k < names.length; k++) { -// LOG.info("Source block location: " + names[k]); -// locations.add(names[k]); -// } -// } -// for (int j = 0 ; j < parityLen; j++) { -// String[] names = parityLoc[j + i * parityLen].getNames(); -// for (int k = 0; k < names.length; k++) { -// LOG.info("Parity block location: " + names[k]); -// Assert.assertTrue(locations.add(names[k])); -// } -// } -// } -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// /** -// * Test that the har parity files will be placed at the good locations when we -// * create them. -// */ -// @Test -// public void testChooseTargetForHarRaidFile() throws IOException { -// setupCluster(); -// try { -// String[] racks = {"/rack2", "/rack2", "/rack2", -// "/rack2", "/rack2", "/rack2"}; -// String[] hosts = -// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", -// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; -// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); -// String harParity = raidrsHarTempPrefix + "/dir/file"; -// int numBlocks = 11; -// DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L); -// DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1); -// FileStatus stat = fs.getFileStatus(new Path(harParity)); -// BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen()); -// int rsParityLength = RaidNode.rsParityLength(conf); -// for (int i = 0; i < numBlocks - rsParityLength; i++) { -// Set locations = new HashSet(); -// for (int j = 0; j < rsParityLength; j++) { -// for (int k = 0; k < loc[i + j].getNames().length; k++) { -// // verify that every adjacent 4 blocks are on differnt nodes -// String name = loc[i + j].getNames()[k]; -// LOG.info("Har Raid block location: " + name); -// Assert.assertTrue(locations.add(name)); -// } -// } -// } -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// /** -// * Test BlockPlacementPolicyRaid.CachedLocatedBlocks -// * Verify that the results obtained from cache is the same as -// * the results obtained directly -// */ -// @Test -// public void testCachedBlocks() throws IOException { -// setupCluster(); -// try { -// String file1 = "/dir/file1"; -// String file2 = "/dir/file2"; -// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); -// // test blocks cache -// CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); -// try { -// Thread.sleep(1200L); -// } catch (InterruptedException e) { -// } -// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// /** -// * Test BlockPlacementPolicyRaid.CachedFullPathNames -// * Verify that the results obtained from cache is the same as -// * the results obtained directly -// */ -// @Test -// public void testCachedPathNames() throws IOException { -// setupCluster(); -// try { -// String file1 = "/dir/file1"; -// String file2 = "/dir/file2"; -// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); -// // test full path cache -// CachedFullPathNames cachedFullPathNames = -// new CachedFullPathNames(namesystem); -// FSInodeInfo inode1 = null; -// FSInodeInfo inode2 = null; -// NameNodeRaidTestUtil.readLock(namesystem.dir); -// try { -// inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true); -// inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true); -// } finally { -// NameNodeRaidTestUtil.readUnLock(namesystem.dir); -// } -// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); -// try { -// Thread.sleep(1200L); -// } catch (InterruptedException e) { -// } -// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// /** -// * Test the result of getCompanionBlocks() on the unraided files -// */ -// @Test -// public void testGetCompanionBLocks() throws IOException { -// setupCluster(); -// try { -// String file1 = "/dir/file1"; -// String file2 = "/raid/dir/file2"; -// String file3 = "/raidrs/dir/file3"; -// // Set the policy to default policy to place the block in the default way -// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( -// conf, namesystem, namesystem.clusterMap)); -// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L); -// Collection companionBlocks; -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock()); -// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock()); -// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock()); -// Assert.assertEquals(1, companionBlocks.size()); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock()); -// Assert.assertEquals(1, companionBlocks.size()); -// -// int rsParityLength = RaidNode.rsParityLength(conf); -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock()); -// Assert.assertEquals(rsParityLength, companionBlocks.size()); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock()); -// Assert.assertEquals(rsParityLength, companionBlocks.size()); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock()); -// Assert.assertEquals(2, companionBlocks.size()); -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// static void setBlockPlacementPolicy( -// FSNamesystem namesystem, BlockPlacementPolicy policy) { -// namesystem.writeLock(); -// try { -// namesystem.blockManager.replicator = policy; -// } finally { -// namesystem.writeUnlock(); -// } -// } -// -// /** -// * Test BlockPlacementPolicyRaid actually deletes the correct replica. -// * Start 2 datanodes and create 1 source file and its parity file. -// * 1) Start host1, create the parity file with replication 1 -// * 2) Start host2, create the source file with replication 2 -// * 3) Set repliation of source file to 1 -// * Verify that the policy should delete the block with more companion blocks. -// */ -// @Test -// public void testDeleteReplica() throws IOException { -// setupCluster(); -// try { -// // Set the policy to default policy to place the block in the default way -// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( -// conf, namesystem, namesystem.clusterMap)); -// DatanodeDescriptor datanode1 = -// NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next(); -// String source = "/dir/file"; -// String parity = xorPrefix + source; -// -// final Path parityPath = new Path(parity); -// DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L); -// DFSTestUtil.waitReplication(fs, parityPath, (short)1); -// -// // start one more datanode -// cluster.startDataNodes(conf, 1, true, null, rack2, host2, null); -// DatanodeDescriptor datanode2 = null; -// for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) { -// if (!d.getName().equals(datanode1.getName())) { -// datanode2 = d; -// } -// } -// Assert.assertTrue(datanode2 != null); -// cluster.waitActive(); -// final Path sourcePath = new Path(source); -// DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L); -// DFSTestUtil.waitReplication(fs, sourcePath, (short)2); -// -// refreshPolicy(); -// Assert.assertEquals(parity, -// policy.getParityFile(source)); -// Assert.assertEquals(source, -// policy.getSourceFile(parity, xorPrefix)); -// -// List sourceBlocks = getBlocks(namesystem, source); -// List parityBlocks = getBlocks(namesystem, parity); -// Assert.assertEquals(5, sourceBlocks.size()); -// Assert.assertEquals(3, parityBlocks.size()); -// -// // verify the result of getCompanionBlocks() -// Collection companionBlocks; -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(0).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{0, 1}, new int[]{0}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(1).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{0, 1}, new int[]{0}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(2).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{2, 3}, new int[]{1}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(3).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{2, 3}, new int[]{1}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(4).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{4}, new int[]{2}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(0).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{0, 1}, new int[]{0}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(1).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{2, 3}, new int[]{1}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(2).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{4}, new int[]{2}); -// -// // Set the policy back to raid policy. We have to create a new object -// // here to clear the block location cache -// refreshPolicy(); -// setBlockPlacementPolicy(namesystem, policy); -// // verify policy deletes the correct blocks. companion blocks should be -// // evenly distributed. -// fs.setReplication(sourcePath, (short)1); -// DFSTestUtil.waitReplication(fs, sourcePath, (short)1); -// Map counters = new HashMap(); -// refreshPolicy(); -// for (int i = 0; i < parityBlocks.size(); i++) { -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(i).getBlock()); -// -// counters = BlockPlacementPolicyRaid.countCompanionBlocks( -// companionBlocks, false); -// Assert.assertTrue(counters.get(datanode1.getName()) >= 1 && -// counters.get(datanode1.getName()) <= 2); -// Assert.assertTrue(counters.get(datanode1.getName()) + -// counters.get(datanode2.getName()) == -// companionBlocks.size()); -// -// counters = BlockPlacementPolicyRaid.countCompanionBlocks( -// companionBlocks, true); -// Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 && -// counters.get(datanode1.getParent().getName()) <= 2); -// Assert.assertTrue(counters.get(datanode1.getParent().getName()) + -// counters.get(datanode2.getParent().getName()) == -// companionBlocks.size()); -// } -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// // create a new BlockPlacementPolicyRaid to clear the cache -// private void refreshPolicy() { -// policy = new BlockPlacementPolicyRaid(); -// policy.initialize(conf, namesystem, namesystem.clusterMap); -// } -// -// private void verifyCompanionBlocks(Collection companionBlocks, -// List sourceBlocks, List parityBlocks, -// int[] sourceBlockIndexes, int[] parityBlockIndexes) { -// Set blockSet = new HashSet(); -// for (LocatedBlock b : companionBlocks) { -// blockSet.add(b.getBlock()); -// } -// Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length, -// blockSet.size()); -// for (int index : sourceBlockIndexes) { -// Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock())); -// } -// for (int index : parityBlockIndexes) { -// Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock())); -// } -// } -// -// private void verifyCachedFullPathNameResult( -// CachedFullPathNames cachedFullPathNames, FSInodeInfo inode) -// throws IOException { -// String res1 = inode.getFullPathName(); -// String res2 = cachedFullPathNames.get(inode); -// LOG.info("Actual path name: " + res1); -// LOG.info("Cached path name: " + res2); -// Assert.assertEquals(cachedFullPathNames.get(inode), -// inode.getFullPathName()); -// } -// -// private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks, -// FSNamesystem namesystem, String file) throws IOException{ -// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); -// List res1 = NameNodeRaidUtil.getBlockLocations(namesystem, -// file, 0L, len, false, false).getLocatedBlocks(); -// List res2 = cachedBlocks.get(file); -// for (int i = 0; i < res1.size(); i++) { -// LOG.info("Actual block: " + res1.get(i).getBlock()); -// LOG.info("Cached block: " + res2.get(i).getBlock()); -// Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock()); -// } -// } -// -// private Collection getCompanionBlocks( -// FSNamesystem namesystem, BlockPlacementPolicyRaid policy, -// ExtendedBlock block) throws IOException { -// INodeFile inode = namesystem.blockManager.blocksMap.getINode(block -// .getLocalBlock()); -// FileType type = policy.getFileType(inode.getFullPathName()); -// return policy.getCompanionBlocks(inode.getFullPathName(), type, -// block.getLocalBlock()); -// } -// -// private List getBlocks(FSNamesystem namesystem, String file) -// throws IOException { -// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); -// return NameNodeRaidUtil.getBlockLocations(namesystem, -// file, 0, len, false, false).getLocatedBlocks(); -// } } diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java b/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java index 0cb2ac75444..41960f8a175 100644 --- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java +++ b/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java @@ -17,33 +17,21 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.*; -import java.util.*; - -import org.apache.hadoop.classification.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.hadoop.hdfs.server.blockmanagement.*; -import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.fs.UnresolvedLinkException; public class NameNodeRaidTestUtil { - public static void readLock(final FSDirectory dir) { + public static FSInodeInfo[] getFSInodeInfo(final FSNamesystem namesystem, + final String... files) throws UnresolvedLinkException { + final FSInodeInfo[] inodes = new FSInodeInfo[files.length]; + final FSDirectory dir = namesystem.dir; dir.readLock(); + try { + for(int i = 0; i < files.length; i++) { + inodes[i] = dir.rootDir.getNode(files[i], true); + } + return inodes; + } finally { + dir.readUnlock(); + } } - - public static void readUnLock(final FSDirectory dir) { - dir.readUnlock(); - } - - public static FSInodeInfo getNode(final FSDirectory dir, - final String src, final boolean resolveLink - ) throws UnresolvedLinkException { - return dir.rootDir.getNode(src, resolveLink); - } - -// public static NavigableMap getDatanodeMap( -// final FSNamesystem namesystem) { -// return namesystem.datanodeMap; -// } } -