From cabec474e0f4c525ba80bd7c8dede1a8d76e4f39 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 19 Jul 2011 15:54:10 +0000 Subject: [PATCH] Revert r1148400 for MAPREDUCE-2711. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1148416 13f79535-47bb-0310-9956-ffa450edef68 --- .../TestBlockPlacementPolicyRaid.java | 1028 ++++++++--------- 1 file changed, 514 insertions(+), 514 deletions(-) diff --git a/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java b/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java index 593233d5a0c..13fa1c447e4 100644 --- a/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java +++ b/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java @@ -1,514 +1,514 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.hadoop.hdfs.server.blockmanagement; -// -//import java.io.IOException; -//import java.util.Collection; -//import java.util.HashMap; -//import java.util.HashSet; -//import java.util.List; -//import java.util.Map; -//import java.util.Set; -// -//import junit.framework.Assert; -// -//import org.apache.commons.logging.Log; -//import org.apache.commons.logging.LogFactory; -//import org.apache.hadoop.conf.Configuration; -//import org.apache.hadoop.fs.BlockLocation; -//import org.apache.hadoop.fs.FileStatus; -//import org.apache.hadoop.fs.FileSystem; -//import org.apache.hadoop.fs.Path; -//import org.apache.hadoop.hdfs.DFSConfigKeys; -//import org.apache.hadoop.hdfs.DFSTestUtil; -//import org.apache.hadoop.hdfs.MiniDFSCluster; -//import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -//import org.apache.hadoop.hdfs.protocol.LocatedBlock; -//import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames; -//import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks; -//import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType; -//import org.apache.hadoop.hdfs.server.namenode.*; -//import org.apache.hadoop.raid.RaidNode; -//import org.junit.Test; -// -//public class TestBlockPlacementPolicyRaid { -// private Configuration conf = null; -// private MiniDFSCluster cluster = null; -// private FSNamesystem namesystem = null; -// private BlockPlacementPolicyRaid policy = null; -// private FileSystem fs = null; -// String[] rack1 = {"/rack1"}; -// String[] rack2 = {"/rack2"}; -// String[] host1 = {"host1.rack1.com"}; -// String[] host2 = {"host2.rack2.com"}; -// String xorPrefix = null; -// String raidTempPrefix = null; -// String raidrsTempPrefix = null; -// String raidrsHarTempPrefix = null; -// -// final static Log LOG = -// LogFactory.getLog(TestBlockPlacementPolicyRaid.class); -// -// protected void setupCluster() throws IOException { -// conf = new Configuration(); -// conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); -// conf.set("dfs.replication.pending.timeout.sec", "2"); -// conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L); -// conf.set("dfs.block.replicator.classname", -// "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid"); -// conf.set(RaidNode.STRIPE_LENGTH_KEY, "2"); -// conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3"); -// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); -// // start the cluster with one datanode first -// cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1). -// format(true).racks(rack1).hosts(host1).build(); -// cluster.waitActive(); -// namesystem = cluster.getNameNode().getNamesystem(); -// Assert.assertTrue("BlockPlacementPolicy type is not correct.", -// namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid); -// policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator; -// fs = cluster.getFileSystem(); -// xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); -// raidTempPrefix = RaidNode.xorTempPrefix(conf); -// raidrsTempPrefix = RaidNode.rsTempPrefix(conf); -// raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf); -// } -// -// /** -// * Test that the parity files will be placed at the good locations when we -// * create them. -// */ -// @Test -// public void testChooseTargetForRaidFile() throws IOException { -// setupCluster(); -// try { -// String src = "/dir/file"; -// String parity = raidrsTempPrefix + src; -// DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L); -// DFSTestUtil.waitReplication(fs, new Path(src), (short)1); -// refreshPolicy(); -// setBlockPlacementPolicy(namesystem, policy); -// // start 3 more datanodes -// String[] racks = {"/rack2", "/rack2", "/rack2", -// "/rack2", "/rack2", "/rack2"}; -// String[] hosts = -// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", -// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; -// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); -// int numBlocks = 6; -// DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L); -// DFSTestUtil.waitReplication(fs, new Path(parity), (short)2); -// FileStatus srcStat = fs.getFileStatus(new Path(src)); -// BlockLocation[] srcLoc = -// fs.getFileBlockLocations(srcStat, 0, srcStat.getLen()); -// FileStatus parityStat = fs.getFileStatus(new Path(parity)); -// BlockLocation[] parityLoc = -// fs.getFileBlockLocations(parityStat, 0, parityStat.getLen()); -// int parityLen = RaidNode.rsParityLength(conf); -// for (int i = 0; i < numBlocks / parityLen; i++) { -// Set locations = new HashSet(); -// for (int j = 0; j < srcLoc.length; j++) { -// String [] names = srcLoc[j].getNames(); -// for (int k = 0; k < names.length; k++) { -// LOG.info("Source block location: " + names[k]); -// locations.add(names[k]); -// } -// } -// for (int j = 0 ; j < parityLen; j++) { -// String[] names = parityLoc[j + i * parityLen].getNames(); -// for (int k = 0; k < names.length; k++) { -// LOG.info("Parity block location: " + names[k]); -// Assert.assertTrue(locations.add(names[k])); -// } -// } -// } -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// /** -// * Test that the har parity files will be placed at the good locations when we -// * create them. -// */ -// @Test -// public void testChooseTargetForHarRaidFile() throws IOException { -// setupCluster(); -// try { -// String[] racks = {"/rack2", "/rack2", "/rack2", -// "/rack2", "/rack2", "/rack2"}; -// String[] hosts = -// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", -// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; -// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); -// String harParity = raidrsHarTempPrefix + "/dir/file"; -// int numBlocks = 11; -// DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L); -// DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1); -// FileStatus stat = fs.getFileStatus(new Path(harParity)); -// BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen()); -// int rsParityLength = RaidNode.rsParityLength(conf); -// for (int i = 0; i < numBlocks - rsParityLength; i++) { -// Set locations = new HashSet(); -// for (int j = 0; j < rsParityLength; j++) { -// for (int k = 0; k < loc[i + j].getNames().length; k++) { -// // verify that every adjacent 4 blocks are on differnt nodes -// String name = loc[i + j].getNames()[k]; -// LOG.info("Har Raid block location: " + name); -// Assert.assertTrue(locations.add(name)); -// } -// } -// } -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// /** -// * Test BlockPlacementPolicyRaid.CachedLocatedBlocks -// * Verify that the results obtained from cache is the same as -// * the results obtained directly -// */ -// @Test -// public void testCachedBlocks() throws IOException { -// setupCluster(); -// try { -// String file1 = "/dir/file1"; -// String file2 = "/dir/file2"; -// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); -// // test blocks cache -// CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); -// try { -// Thread.sleep(1200L); -// } catch (InterruptedException e) { -// } -// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); -// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// /** -// * Test BlockPlacementPolicyRaid.CachedFullPathNames -// * Verify that the results obtained from cache is the same as -// * the results obtained directly -// */ -// @Test -// public void testCachedPathNames() throws IOException { -// setupCluster(); -// try { -// String file1 = "/dir/file1"; -// String file2 = "/dir/file2"; -// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); -// // test full path cache -// CachedFullPathNames cachedFullPathNames = -// new CachedFullPathNames(namesystem); -// FSInodeInfo inode1 = null; -// FSInodeInfo inode2 = null; -// NameNodeRaidTestUtil.readLock(namesystem.dir); -// try { -// inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true); -// inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true); -// } finally { -// NameNodeRaidTestUtil.readUnLock(namesystem.dir); -// } -// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); -// try { -// Thread.sleep(1200L); -// } catch (InterruptedException e) { -// } -// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); -// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// /** -// * Test the result of getCompanionBlocks() on the unraided files -// */ -// @Test -// public void testGetCompanionBLocks() throws IOException { -// setupCluster(); -// try { -// String file1 = "/dir/file1"; -// String file2 = "/raid/dir/file2"; -// String file3 = "/raidrs/dir/file3"; -// // Set the policy to default policy to place the block in the default way -// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( -// conf, namesystem, namesystem.clusterMap)); -// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); -// DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L); -// Collection companionBlocks; -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock()); -// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock()); -// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock()); -// Assert.assertEquals(1, companionBlocks.size()); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock()); -// Assert.assertEquals(1, companionBlocks.size()); -// -// int rsParityLength = RaidNode.rsParityLength(conf); -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock()); -// Assert.assertEquals(rsParityLength, companionBlocks.size()); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock()); -// Assert.assertEquals(rsParityLength, companionBlocks.size()); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock()); -// Assert.assertEquals(2, companionBlocks.size()); -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// static void setBlockPlacementPolicy( -// FSNamesystem namesystem, BlockPlacementPolicy policy) { -// namesystem.writeLock(); -// try { -// namesystem.blockManager.replicator = policy; -// } finally { -// namesystem.writeUnlock(); -// } -// } -// -// /** -// * Test BlockPlacementPolicyRaid actually deletes the correct replica. -// * Start 2 datanodes and create 1 source file and its parity file. -// * 1) Start host1, create the parity file with replication 1 -// * 2) Start host2, create the source file with replication 2 -// * 3) Set repliation of source file to 1 -// * Verify that the policy should delete the block with more companion blocks. -// */ -// @Test -// public void testDeleteReplica() throws IOException { -// setupCluster(); -// try { -// // Set the policy to default policy to place the block in the default way -// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( -// conf, namesystem, namesystem.clusterMap)); -// DatanodeDescriptor datanode1 = -// NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next(); -// String source = "/dir/file"; -// String parity = xorPrefix + source; -// -// final Path parityPath = new Path(parity); -// DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L); -// DFSTestUtil.waitReplication(fs, parityPath, (short)1); -// -// // start one more datanode -// cluster.startDataNodes(conf, 1, true, null, rack2, host2, null); -// DatanodeDescriptor datanode2 = null; -// for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) { -// if (!d.getName().equals(datanode1.getName())) { -// datanode2 = d; -// } -// } -// Assert.assertTrue(datanode2 != null); -// cluster.waitActive(); -// final Path sourcePath = new Path(source); -// DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L); -// DFSTestUtil.waitReplication(fs, sourcePath, (short)2); -// -// refreshPolicy(); -// Assert.assertEquals(parity, -// policy.getParityFile(source)); -// Assert.assertEquals(source, -// policy.getSourceFile(parity, xorPrefix)); -// -// List sourceBlocks = getBlocks(namesystem, source); -// List parityBlocks = getBlocks(namesystem, parity); -// Assert.assertEquals(5, sourceBlocks.size()); -// Assert.assertEquals(3, parityBlocks.size()); -// -// // verify the result of getCompanionBlocks() -// Collection companionBlocks; -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(0).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{0, 1}, new int[]{0}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(1).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{0, 1}, new int[]{0}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(2).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{2, 3}, new int[]{1}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(3).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{2, 3}, new int[]{1}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, sourceBlocks.get(4).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{4}, new int[]{2}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(0).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{0, 1}, new int[]{0}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(1).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{2, 3}, new int[]{1}); -// -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(2).getBlock()); -// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, -// new int[]{4}, new int[]{2}); -// -// // Set the policy back to raid policy. We have to create a new object -// // here to clear the block location cache -// refreshPolicy(); -// setBlockPlacementPolicy(namesystem, policy); -// // verify policy deletes the correct blocks. companion blocks should be -// // evenly distributed. -// fs.setReplication(sourcePath, (short)1); -// DFSTestUtil.waitReplication(fs, sourcePath, (short)1); -// Map counters = new HashMap(); -// refreshPolicy(); -// for (int i = 0; i < parityBlocks.size(); i++) { -// companionBlocks = getCompanionBlocks( -// namesystem, policy, parityBlocks.get(i).getBlock()); -// -// counters = BlockPlacementPolicyRaid.countCompanionBlocks( -// companionBlocks, false); -// Assert.assertTrue(counters.get(datanode1.getName()) >= 1 && -// counters.get(datanode1.getName()) <= 2); -// Assert.assertTrue(counters.get(datanode1.getName()) + -// counters.get(datanode2.getName()) == -// companionBlocks.size()); -// -// counters = BlockPlacementPolicyRaid.countCompanionBlocks( -// companionBlocks, true); -// Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 && -// counters.get(datanode1.getParent().getName()) <= 2); -// Assert.assertTrue(counters.get(datanode1.getParent().getName()) + -// counters.get(datanode2.getParent().getName()) == -// companionBlocks.size()); -// } -// } finally { -// if (cluster != null) { -// cluster.shutdown(); -// } -// } -// } -// -// // create a new BlockPlacementPolicyRaid to clear the cache -// private void refreshPolicy() { -// policy = new BlockPlacementPolicyRaid(); -// policy.initialize(conf, namesystem, namesystem.clusterMap); -// } -// -// private void verifyCompanionBlocks(Collection companionBlocks, -// List sourceBlocks, List parityBlocks, -// int[] sourceBlockIndexes, int[] parityBlockIndexes) { -// Set blockSet = new HashSet(); -// for (LocatedBlock b : companionBlocks) { -// blockSet.add(b.getBlock()); -// } -// Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length, -// blockSet.size()); -// for (int index : sourceBlockIndexes) { -// Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock())); -// } -// for (int index : parityBlockIndexes) { -// Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock())); -// } -// } -// -// private void verifyCachedFullPathNameResult( -// CachedFullPathNames cachedFullPathNames, FSInodeInfo inode) -// throws IOException { -// String res1 = inode.getFullPathName(); -// String res2 = cachedFullPathNames.get(inode); -// LOG.info("Actual path name: " + res1); -// LOG.info("Cached path name: " + res2); -// Assert.assertEquals(cachedFullPathNames.get(inode), -// inode.getFullPathName()); -// } -// -// private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks, -// FSNamesystem namesystem, String file) throws IOException{ -// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); -// List res1 = NameNodeRaidUtil.getBlockLocations(namesystem, -// file, 0L, len, false, false).getLocatedBlocks(); -// List res2 = cachedBlocks.get(file); -// for (int i = 0; i < res1.size(); i++) { -// LOG.info("Actual block: " + res1.get(i).getBlock()); -// LOG.info("Cached block: " + res2.get(i).getBlock()); -// Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock()); -// } -// } -// -// private Collection getCompanionBlocks( -// FSNamesystem namesystem, BlockPlacementPolicyRaid policy, -// ExtendedBlock block) throws IOException { -// INodeFile inode = namesystem.blockManager.blocksMap.getINode(block -// .getLocalBlock()); -// FileType type = policy.getFileType(inode.getFullPathName()); -// return policy.getCompanionBlocks(inode.getFullPathName(), type, -// block.getLocalBlock()); -// } -// -// private List getBlocks(FSNamesystem namesystem, String file) -// throws IOException { -// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); -// return NameNodeRaidUtil.getBlockLocations(namesystem, -// file, 0, len, false, false).getLocatedBlocks(); -// } -//} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType; +import org.apache.hadoop.hdfs.server.namenode.*; +import org.apache.hadoop.raid.RaidNode; +import org.junit.Test; + +public class TestBlockPlacementPolicyRaid { + private Configuration conf = null; + private MiniDFSCluster cluster = null; + private FSNamesystem namesystem = null; + private BlockPlacementPolicyRaid policy = null; + private FileSystem fs = null; + String[] rack1 = {"/rack1"}; + String[] rack2 = {"/rack2"}; + String[] host1 = {"host1.rack1.com"}; + String[] host2 = {"host2.rack2.com"}; + String xorPrefix = null; + String raidTempPrefix = null; + String raidrsTempPrefix = null; + String raidrsHarTempPrefix = null; + + final static Log LOG = + LogFactory.getLog(TestBlockPlacementPolicyRaid.class); + + protected void setupCluster() throws IOException { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); + conf.set("dfs.replication.pending.timeout.sec", "2"); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L); + conf.set("dfs.block.replicator.classname", + "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid"); + conf.set(RaidNode.STRIPE_LENGTH_KEY, "2"); + conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3"); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); + // start the cluster with one datanode first + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1). + format(true).racks(rack1).hosts(host1).build(); + cluster.waitActive(); + namesystem = cluster.getNameNode().getNamesystem(); + Assert.assertTrue("BlockPlacementPolicy type is not correct.", + namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid); + policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator; + fs = cluster.getFileSystem(); + xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); + raidTempPrefix = RaidNode.xorTempPrefix(conf); + raidrsTempPrefix = RaidNode.rsTempPrefix(conf); + raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf); + } + + /** + * Test that the parity files will be placed at the good locations when we + * create them. + */ + @Test + public void testChooseTargetForRaidFile() throws IOException { + setupCluster(); + try { + String src = "/dir/file"; + String parity = raidrsTempPrefix + src; + DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L); + DFSTestUtil.waitReplication(fs, new Path(src), (short)1); + refreshPolicy(); + setBlockPlacementPolicy(namesystem, policy); + // start 3 more datanodes + String[] racks = {"/rack2", "/rack2", "/rack2", + "/rack2", "/rack2", "/rack2"}; + String[] hosts = + {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", + "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; + cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); + int numBlocks = 6; + DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L); + DFSTestUtil.waitReplication(fs, new Path(parity), (short)2); + FileStatus srcStat = fs.getFileStatus(new Path(src)); + BlockLocation[] srcLoc = + fs.getFileBlockLocations(srcStat, 0, srcStat.getLen()); + FileStatus parityStat = fs.getFileStatus(new Path(parity)); + BlockLocation[] parityLoc = + fs.getFileBlockLocations(parityStat, 0, parityStat.getLen()); + int parityLen = RaidNode.rsParityLength(conf); + for (int i = 0; i < numBlocks / parityLen; i++) { + Set locations = new HashSet(); + for (int j = 0; j < srcLoc.length; j++) { + String [] names = srcLoc[j].getNames(); + for (int k = 0; k < names.length; k++) { + LOG.info("Source block location: " + names[k]); + locations.add(names[k]); + } + } + for (int j = 0 ; j < parityLen; j++) { + String[] names = parityLoc[j + i * parityLen].getNames(); + for (int k = 0; k < names.length; k++) { + LOG.info("Parity block location: " + names[k]); + Assert.assertTrue(locations.add(names[k])); + } + } + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test that the har parity files will be placed at the good locations when we + * create them. + */ + @Test + public void testChooseTargetForHarRaidFile() throws IOException { + setupCluster(); + try { + String[] racks = {"/rack2", "/rack2", "/rack2", + "/rack2", "/rack2", "/rack2"}; + String[] hosts = + {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", + "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; + cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); + String harParity = raidrsHarTempPrefix + "/dir/file"; + int numBlocks = 11; + DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L); + DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1); + FileStatus stat = fs.getFileStatus(new Path(harParity)); + BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen()); + int rsParityLength = RaidNode.rsParityLength(conf); + for (int i = 0; i < numBlocks - rsParityLength; i++) { + Set locations = new HashSet(); + for (int j = 0; j < rsParityLength; j++) { + for (int k = 0; k < loc[i + j].getNames().length; k++) { + // verify that every adjacent 4 blocks are on differnt nodes + String name = loc[i + j].getNames()[k]; + LOG.info("Har Raid block location: " + name); + Assert.assertTrue(locations.add(name)); + } + } + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test BlockPlacementPolicyRaid.CachedLocatedBlocks + * Verify that the results obtained from cache is the same as + * the results obtained directly + */ + @Test + public void testCachedBlocks() throws IOException { + setupCluster(); + try { + String file1 = "/dir/file1"; + String file2 = "/dir/file2"; + DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); + // test blocks cache + CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem); + verifyCachedBlocksResult(cachedBlocks, namesystem, file1); + verifyCachedBlocksResult(cachedBlocks, namesystem, file1); + verifyCachedBlocksResult(cachedBlocks, namesystem, file2); + verifyCachedBlocksResult(cachedBlocks, namesystem, file2); + try { + Thread.sleep(1200L); + } catch (InterruptedException e) { + } + verifyCachedBlocksResult(cachedBlocks, namesystem, file2); + verifyCachedBlocksResult(cachedBlocks, namesystem, file1); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test BlockPlacementPolicyRaid.CachedFullPathNames + * Verify that the results obtained from cache is the same as + * the results obtained directly + */ + @Test + public void testCachedPathNames() throws IOException { + setupCluster(); + try { + String file1 = "/dir/file1"; + String file2 = "/dir/file2"; + DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); + // test full path cache + CachedFullPathNames cachedFullPathNames = + new CachedFullPathNames(namesystem); + FSInodeInfo inode1 = null; + FSInodeInfo inode2 = null; + NameNodeRaidTestUtil.readLock(namesystem.dir); + try { + inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true); + inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true); + } finally { + NameNodeRaidTestUtil.readUnLock(namesystem.dir); + } + verifyCachedFullPathNameResult(cachedFullPathNames, inode1); + verifyCachedFullPathNameResult(cachedFullPathNames, inode1); + verifyCachedFullPathNameResult(cachedFullPathNames, inode2); + verifyCachedFullPathNameResult(cachedFullPathNames, inode2); + try { + Thread.sleep(1200L); + } catch (InterruptedException e) { + } + verifyCachedFullPathNameResult(cachedFullPathNames, inode2); + verifyCachedFullPathNameResult(cachedFullPathNames, inode1); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** + * Test the result of getCompanionBlocks() on the unraided files + */ + @Test + public void testGetCompanionBLocks() throws IOException { + setupCluster(); + try { + String file1 = "/dir/file1"; + String file2 = "/raid/dir/file2"; + String file3 = "/raidrs/dir/file3"; + // Set the policy to default policy to place the block in the default way + setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( + conf, namesystem, namesystem.clusterMap)); + DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); + DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L); + Collection companionBlocks; + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock()); + Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock()); + Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock()); + Assert.assertEquals(1, companionBlocks.size()); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock()); + Assert.assertEquals(1, companionBlocks.size()); + + int rsParityLength = RaidNode.rsParityLength(conf); + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock()); + Assert.assertEquals(rsParityLength, companionBlocks.size()); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock()); + Assert.assertEquals(rsParityLength, companionBlocks.size()); + + companionBlocks = getCompanionBlocks( + namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock()); + Assert.assertEquals(2, companionBlocks.size()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + static void setBlockPlacementPolicy( + FSNamesystem namesystem, BlockPlacementPolicy policy) { + namesystem.writeLock(); + try { + namesystem.blockManager.replicator = policy; + } finally { + namesystem.writeUnlock(); + } + } + + /** + * Test BlockPlacementPolicyRaid actually deletes the correct replica. + * Start 2 datanodes and create 1 source file and its parity file. + * 1) Start host1, create the parity file with replication 1 + * 2) Start host2, create the source file with replication 2 + * 3) Set repliation of source file to 1 + * Verify that the policy should delete the block with more companion blocks. + */ + @Test + public void testDeleteReplica() throws IOException { + setupCluster(); + try { + // Set the policy to default policy to place the block in the default way + setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( + conf, namesystem, namesystem.clusterMap)); + DatanodeDescriptor datanode1 = + NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next(); + String source = "/dir/file"; + String parity = xorPrefix + source; + + final Path parityPath = new Path(parity); + DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L); + DFSTestUtil.waitReplication(fs, parityPath, (short)1); + + // start one more datanode + cluster.startDataNodes(conf, 1, true, null, rack2, host2, null); + DatanodeDescriptor datanode2 = null; + for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) { + if (!d.getName().equals(datanode1.getName())) { + datanode2 = d; + } + } + Assert.assertTrue(datanode2 != null); + cluster.waitActive(); + final Path sourcePath = new Path(source); + DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L); + DFSTestUtil.waitReplication(fs, sourcePath, (short)2); + + refreshPolicy(); + Assert.assertEquals(parity, + policy.getParityFile(source)); + Assert.assertEquals(source, + policy.getSourceFile(parity, xorPrefix)); + + List sourceBlocks = getBlocks(namesystem, source); + List parityBlocks = getBlocks(namesystem, parity); + Assert.assertEquals(5, sourceBlocks.size()); + Assert.assertEquals(3, parityBlocks.size()); + + // verify the result of getCompanionBlocks() + Collection companionBlocks; + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(0).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{0, 1}, new int[]{0}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(1).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{0, 1}, new int[]{0}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(2).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{2, 3}, new int[]{1}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(3).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{2, 3}, new int[]{1}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, sourceBlocks.get(4).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{4}, new int[]{2}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(0).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{0, 1}, new int[]{0}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(1).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{2, 3}, new int[]{1}); + + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(2).getBlock()); + verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, + new int[]{4}, new int[]{2}); + + // Set the policy back to raid policy. We have to create a new object + // here to clear the block location cache + refreshPolicy(); + setBlockPlacementPolicy(namesystem, policy); + // verify policy deletes the correct blocks. companion blocks should be + // evenly distributed. + fs.setReplication(sourcePath, (short)1); + DFSTestUtil.waitReplication(fs, sourcePath, (short)1); + Map counters = new HashMap(); + refreshPolicy(); + for (int i = 0; i < parityBlocks.size(); i++) { + companionBlocks = getCompanionBlocks( + namesystem, policy, parityBlocks.get(i).getBlock()); + + counters = BlockPlacementPolicyRaid.countCompanionBlocks( + companionBlocks, false); + Assert.assertTrue(counters.get(datanode1.getName()) >= 1 && + counters.get(datanode1.getName()) <= 2); + Assert.assertTrue(counters.get(datanode1.getName()) + + counters.get(datanode2.getName()) == + companionBlocks.size()); + + counters = BlockPlacementPolicyRaid.countCompanionBlocks( + companionBlocks, true); + Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 && + counters.get(datanode1.getParent().getName()) <= 2); + Assert.assertTrue(counters.get(datanode1.getParent().getName()) + + counters.get(datanode2.getParent().getName()) == + companionBlocks.size()); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + // create a new BlockPlacementPolicyRaid to clear the cache + private void refreshPolicy() { + policy = new BlockPlacementPolicyRaid(); + policy.initialize(conf, namesystem, namesystem.clusterMap); + } + + private void verifyCompanionBlocks(Collection companionBlocks, + List sourceBlocks, List parityBlocks, + int[] sourceBlockIndexes, int[] parityBlockIndexes) { + Set blockSet = new HashSet(); + for (LocatedBlock b : companionBlocks) { + blockSet.add(b.getBlock()); + } + Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length, + blockSet.size()); + for (int index : sourceBlockIndexes) { + Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock())); + } + for (int index : parityBlockIndexes) { + Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock())); + } + } + + private void verifyCachedFullPathNameResult( + CachedFullPathNames cachedFullPathNames, FSInodeInfo inode) + throws IOException { + String res1 = inode.getFullPathName(); + String res2 = cachedFullPathNames.get(inode); + LOG.info("Actual path name: " + res1); + LOG.info("Cached path name: " + res2); + Assert.assertEquals(cachedFullPathNames.get(inode), + inode.getFullPathName()); + } + + private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks, + FSNamesystem namesystem, String file) throws IOException{ + long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); + List res1 = NameNodeRaidUtil.getBlockLocations(namesystem, + file, 0L, len, false, false).getLocatedBlocks(); + List res2 = cachedBlocks.get(file); + for (int i = 0; i < res1.size(); i++) { + LOG.info("Actual block: " + res1.get(i).getBlock()); + LOG.info("Cached block: " + res2.get(i).getBlock()); + Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock()); + } + } + + private Collection getCompanionBlocks( + FSNamesystem namesystem, BlockPlacementPolicyRaid policy, + ExtendedBlock block) throws IOException { + INodeFile inode = namesystem.blockManager.blocksMap.getINode(block + .getLocalBlock()); + FileType type = policy.getFileType(inode.getFullPathName()); + return policy.getCompanionBlocks(inode.getFullPathName(), type, + block.getLocalBlock()); + } + + private List getBlocks(FSNamesystem namesystem, String file) + throws IOException { + long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); + return NameNodeRaidUtil.getBlockLocations(namesystem, + file, 0, len, false, false).getLocatedBlocks(); + } +}