svn merge -c 1167001 from trunk for MAPREDUCE-2711.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1167004 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
42dbcbb4d8
commit
eec8cebd0f
|
@ -247,6 +247,9 @@ Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-2690. Web-page for FifoScheduler. (Eric Payne via acmurthy)
|
MAPREDUCE-2690. Web-page for FifoScheduler. (Eric Payne via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-2711. Update TestBlockPlacementPolicyRaid for the new namesystem
|
||||||
|
and block management APIs. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||||
|
|
|
@ -26,8 +26,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -43,475 +41,479 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.*;
|
import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeRaidTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeRaidUtil;
|
||||||
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.raid.RaidNode;
|
import org.apache.hadoop.raid.RaidNode;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestBlockPlacementPolicyRaid {
|
public class TestBlockPlacementPolicyRaid {
|
||||||
|
private Configuration conf = null;
|
||||||
|
private MiniDFSCluster cluster = null;
|
||||||
|
private FSNamesystem namesystem = null;
|
||||||
|
private BlockManager blockManager;
|
||||||
|
private NetworkTopology networktopology;
|
||||||
|
private BlockPlacementPolicyRaid policy = null;
|
||||||
|
private FileSystem fs = null;
|
||||||
|
String[] rack1 = {"/rack1"};
|
||||||
|
String[] rack2 = {"/rack2"};
|
||||||
|
String[] host1 = {"host1.rack1.com"};
|
||||||
|
String[] host2 = {"host2.rack2.com"};
|
||||||
|
String xorPrefix = null;
|
||||||
|
String raidTempPrefix = null;
|
||||||
|
String raidrsTempPrefix = null;
|
||||||
|
String raidrsHarTempPrefix = null;
|
||||||
|
|
||||||
|
final static Log LOG =
|
||||||
|
LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
|
||||||
|
|
||||||
|
protected void setupCluster() throws IOException {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
|
||||||
|
conf.set("dfs.replication.pending.timeout.sec", "2");
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
|
||||||
|
conf.set("dfs.block.replicator.classname",
|
||||||
|
BlockPlacementPolicyRaid.class.getName());
|
||||||
|
conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
|
||||||
|
conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
|
||||||
|
// start the cluster with one datanode first
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
|
||||||
|
format(true).racks(rack1).hosts(host1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
namesystem = cluster.getNameNode().getNamesystem();
|
||||||
|
blockManager = namesystem.getBlockManager();
|
||||||
|
networktopology = blockManager.getDatanodeManager().getNetworkTopology();
|
||||||
|
|
||||||
|
Assert.assertTrue("BlockPlacementPolicy type is not correct.",
|
||||||
|
blockManager.getBlockPlacementPolicy() instanceof BlockPlacementPolicyRaid);
|
||||||
|
policy = (BlockPlacementPolicyRaid)blockManager.getBlockPlacementPolicy();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
|
||||||
|
raidTempPrefix = RaidNode.xorTempPrefix(conf);
|
||||||
|
raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
|
||||||
|
raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the parity files will be placed at the good locations when we
|
||||||
|
* create them.
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testFoo() {
|
public void testChooseTargetForRaidFile() throws IOException {
|
||||||
|
setupCluster();
|
||||||
|
try {
|
||||||
|
String src = "/dir/file";
|
||||||
|
String parity = raidrsTempPrefix + src;
|
||||||
|
DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
|
||||||
|
DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
|
||||||
|
refreshPolicy();
|
||||||
|
setBlockPlacementPolicy(namesystem, policy);
|
||||||
|
// start 3 more datanodes
|
||||||
|
String[] racks = {"/rack2", "/rack2", "/rack2",
|
||||||
|
"/rack2", "/rack2", "/rack2"};
|
||||||
|
String[] hosts =
|
||||||
|
{"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
|
||||||
|
"host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
|
||||||
|
cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
|
||||||
|
int numBlocks = 6;
|
||||||
|
DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
|
||||||
|
DFSTestUtil.waitReplication(fs, new Path(parity), (short)2);
|
||||||
|
FileStatus srcStat = fs.getFileStatus(new Path(src));
|
||||||
|
BlockLocation[] srcLoc =
|
||||||
|
fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
|
||||||
|
FileStatus parityStat = fs.getFileStatus(new Path(parity));
|
||||||
|
BlockLocation[] parityLoc =
|
||||||
|
fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
|
||||||
|
int parityLen = RaidNode.rsParityLength(conf);
|
||||||
|
for (int i = 0; i < numBlocks / parityLen; i++) {
|
||||||
|
Set<String> locations = new HashSet<String>();
|
||||||
|
for (int j = 0; j < srcLoc.length; j++) {
|
||||||
|
String [] names = srcLoc[j].getNames();
|
||||||
|
for (int k = 0; k < names.length; k++) {
|
||||||
|
LOG.info("Source block location: " + names[k]);
|
||||||
|
locations.add(names[k]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int j = 0 ; j < parityLen; j++) {
|
||||||
|
String[] names = parityLoc[j + i * parityLen].getNames();
|
||||||
|
for (int k = 0; k < names.length; k++) {
|
||||||
|
LOG.info("Parity block location: " + names[k]);
|
||||||
|
Assert.assertTrue(locations.add(names[k]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the har parity files will be placed at the good locations when we
|
||||||
|
* create them.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseTargetForHarRaidFile() throws IOException {
|
||||||
|
setupCluster();
|
||||||
|
try {
|
||||||
|
String[] racks = {"/rack2", "/rack2", "/rack2",
|
||||||
|
"/rack2", "/rack2", "/rack2"};
|
||||||
|
String[] hosts =
|
||||||
|
{"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
|
||||||
|
"host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
|
||||||
|
cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
|
||||||
|
String harParity = raidrsHarTempPrefix + "/dir/file";
|
||||||
|
int numBlocks = 11;
|
||||||
|
DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
|
||||||
|
DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
|
||||||
|
FileStatus stat = fs.getFileStatus(new Path(harParity));
|
||||||
|
BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
|
||||||
|
int rsParityLength = RaidNode.rsParityLength(conf);
|
||||||
|
for (int i = 0; i < numBlocks - rsParityLength; i++) {
|
||||||
|
Set<String> locations = new HashSet<String>();
|
||||||
|
for (int j = 0; j < rsParityLength; j++) {
|
||||||
|
for (int k = 0; k < loc[i + j].getNames().length; k++) {
|
||||||
|
// verify that every adjacent 4 blocks are on differnt nodes
|
||||||
|
String name = loc[i + j].getNames()[k];
|
||||||
|
LOG.info("Har Raid block location: " + name);
|
||||||
|
Assert.assertTrue(locations.add(name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test BlockPlacementPolicyRaid.CachedLocatedBlocks
|
||||||
|
* Verify that the results obtained from cache is the same as
|
||||||
|
* the results obtained directly
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCachedBlocks() throws IOException {
|
||||||
|
setupCluster();
|
||||||
|
try {
|
||||||
|
String file1 = "/dir/file1";
|
||||||
|
String file2 = "/dir/file2";
|
||||||
|
DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
|
||||||
|
DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
|
||||||
|
// test blocks cache
|
||||||
|
CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
|
||||||
|
verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
|
||||||
|
verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
|
||||||
|
verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
|
||||||
|
verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
|
||||||
|
try {
|
||||||
|
Thread.sleep(1200L);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
|
||||||
|
verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test BlockPlacementPolicyRaid.CachedFullPathNames
|
||||||
|
* Verify that the results obtained from cache is the same as
|
||||||
|
* the results obtained directly
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCachedPathNames() throws IOException {
|
||||||
|
setupCluster();
|
||||||
|
try {
|
||||||
|
String file1 = "/dir/file1";
|
||||||
|
String file2 = "/dir/file2";
|
||||||
|
DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
|
||||||
|
DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
|
||||||
|
// test full path cache
|
||||||
|
CachedFullPathNames cachedFullPathNames =
|
||||||
|
new CachedFullPathNames(namesystem);
|
||||||
|
final FSInodeInfo[] inodes = NameNodeRaidTestUtil.getFSInodeInfo(
|
||||||
|
namesystem, file1, file2);
|
||||||
|
|
||||||
|
verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]);
|
||||||
|
verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]);
|
||||||
|
verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]);
|
||||||
|
verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]);
|
||||||
|
try {
|
||||||
|
Thread.sleep(1200L);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]);
|
||||||
|
verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Test the result of getCompanionBlocks() on the unraided files
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetCompanionBLocks() throws IOException {
|
||||||
|
setupCluster();
|
||||||
|
try {
|
||||||
|
String file1 = "/dir/file1";
|
||||||
|
String file2 = "/raid/dir/file2";
|
||||||
|
String file3 = "/raidrs/dir/file3";
|
||||||
|
// Set the policy to default policy to place the block in the default way
|
||||||
|
setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
|
||||||
|
conf, namesystem, networktopology));
|
||||||
|
DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
|
||||||
|
DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
|
||||||
|
DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
|
||||||
|
Collection<LocatedBlock> companionBlocks;
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
|
||||||
|
Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
|
||||||
|
Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
|
||||||
|
Assert.assertEquals(1, companionBlocks.size());
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
|
||||||
|
Assert.assertEquals(1, companionBlocks.size());
|
||||||
|
|
||||||
|
int rsParityLength = RaidNode.rsParityLength(conf);
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
|
||||||
|
Assert.assertEquals(rsParityLength, companionBlocks.size());
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
|
||||||
|
Assert.assertEquals(rsParityLength, companionBlocks.size());
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
|
||||||
|
Assert.assertEquals(2, companionBlocks.size());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setBlockPlacementPolicy(
|
||||||
|
FSNamesystem namesystem, BlockPlacementPolicy policy) {
|
||||||
|
namesystem.writeLock();
|
||||||
|
try {
|
||||||
|
namesystem.getBlockManager().setBlockPlacementPolicy(policy);
|
||||||
|
} finally {
|
||||||
|
namesystem.writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test BlockPlacementPolicyRaid actually deletes the correct replica.
|
||||||
|
* Start 2 datanodes and create 1 source file and its parity file.
|
||||||
|
* 1) Start host1, create the parity file with replication 1
|
||||||
|
* 2) Start host2, create the source file with replication 2
|
||||||
|
* 3) Set repliation of source file to 1
|
||||||
|
* Verify that the policy should delete the block with more companion blocks.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeleteReplica() throws IOException {
|
||||||
|
setupCluster();
|
||||||
|
try {
|
||||||
|
// Set the policy to default policy to place the block in the default way
|
||||||
|
setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
|
||||||
|
conf, namesystem, networktopology));
|
||||||
|
DatanodeDescriptor datanode1 = blockManager.getDatanodeManager(
|
||||||
|
).getDatanodeCyclicIteration("").iterator().next().getValue();
|
||||||
|
String source = "/dir/file";
|
||||||
|
String parity = xorPrefix + source;
|
||||||
|
|
||||||
|
final Path parityPath = new Path(parity);
|
||||||
|
DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
|
||||||
|
DFSTestUtil.waitReplication(fs, parityPath, (short)1);
|
||||||
|
|
||||||
|
// start one more datanode
|
||||||
|
cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
|
||||||
|
DatanodeDescriptor datanode2 = null;
|
||||||
|
for(Map.Entry<String, DatanodeDescriptor> e : blockManager.getDatanodeManager(
|
||||||
|
).getDatanodeCyclicIteration("")) {
|
||||||
|
final DatanodeDescriptor d = e.getValue();
|
||||||
|
if (!d.getName().equals(datanode1.getName())) {
|
||||||
|
datanode2 = d;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertTrue(datanode2 != null);
|
||||||
|
cluster.waitActive();
|
||||||
|
final Path sourcePath = new Path(source);
|
||||||
|
DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
|
||||||
|
DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
|
||||||
|
|
||||||
|
refreshPolicy();
|
||||||
|
Assert.assertEquals(parity,
|
||||||
|
policy.getParityFile(source));
|
||||||
|
Assert.assertEquals(source,
|
||||||
|
policy.getSourceFile(parity, xorPrefix));
|
||||||
|
|
||||||
|
List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
|
||||||
|
List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
|
||||||
|
Assert.assertEquals(5, sourceBlocks.size());
|
||||||
|
Assert.assertEquals(3, parityBlocks.size());
|
||||||
|
|
||||||
|
// verify the result of getCompanionBlocks()
|
||||||
|
Collection<LocatedBlock> companionBlocks;
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, sourceBlocks.get(0).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{0, 1}, new int[]{0});
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, sourceBlocks.get(1).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{0, 1}, new int[]{0});
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, sourceBlocks.get(2).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{2, 3}, new int[]{1});
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, sourceBlocks.get(3).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{2, 3}, new int[]{1});
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, sourceBlocks.get(4).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{4}, new int[]{2});
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, parityBlocks.get(0).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{0, 1}, new int[]{0});
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, parityBlocks.get(1).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{2, 3}, new int[]{1});
|
||||||
|
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, parityBlocks.get(2).getBlock());
|
||||||
|
verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
||||||
|
new int[]{4}, new int[]{2});
|
||||||
|
|
||||||
|
// Set the policy back to raid policy. We have to create a new object
|
||||||
|
// here to clear the block location cache
|
||||||
|
refreshPolicy();
|
||||||
|
setBlockPlacementPolicy(namesystem, policy);
|
||||||
|
// verify policy deletes the correct blocks. companion blocks should be
|
||||||
|
// evenly distributed.
|
||||||
|
fs.setReplication(sourcePath, (short)1);
|
||||||
|
DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
|
||||||
|
Map<String, Integer> counters = new HashMap<String, Integer>();
|
||||||
|
refreshPolicy();
|
||||||
|
for (int i = 0; i < parityBlocks.size(); i++) {
|
||||||
|
companionBlocks = getCompanionBlocks(
|
||||||
|
namesystem, policy, parityBlocks.get(i).getBlock());
|
||||||
|
|
||||||
|
counters = BlockPlacementPolicyRaid.countCompanionBlocks(
|
||||||
|
companionBlocks, false);
|
||||||
|
Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
|
||||||
|
counters.get(datanode1.getName()) <= 2);
|
||||||
|
Assert.assertTrue(counters.get(datanode1.getName()) +
|
||||||
|
counters.get(datanode2.getName()) ==
|
||||||
|
companionBlocks.size());
|
||||||
|
|
||||||
|
counters = BlockPlacementPolicyRaid.countCompanionBlocks(
|
||||||
|
companionBlocks, true);
|
||||||
|
Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
|
||||||
|
counters.get(datanode1.getParent().getName()) <= 2);
|
||||||
|
Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
|
||||||
|
counters.get(datanode2.getParent().getName()) ==
|
||||||
|
companionBlocks.size());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a new BlockPlacementPolicyRaid to clear the cache
|
||||||
|
private void refreshPolicy() {
|
||||||
|
policy = new BlockPlacementPolicyRaid();
|
||||||
|
policy.initialize(conf, namesystem, networktopology);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
|
||||||
|
List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
|
||||||
|
int[] sourceBlockIndexes, int[] parityBlockIndexes) {
|
||||||
|
Set<ExtendedBlock> blockSet = new HashSet<ExtendedBlock>();
|
||||||
|
for (LocatedBlock b : companionBlocks) {
|
||||||
|
blockSet.add(b.getBlock());
|
||||||
|
}
|
||||||
|
Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
|
||||||
|
blockSet.size());
|
||||||
|
for (int index : sourceBlockIndexes) {
|
||||||
|
Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
|
||||||
|
}
|
||||||
|
for (int index : parityBlockIndexes) {
|
||||||
|
Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyCachedFullPathNameResult(
|
||||||
|
CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
|
||||||
|
throws IOException {
|
||||||
|
String res1 = inode.getFullPathName();
|
||||||
|
String res2 = cachedFullPathNames.get(inode);
|
||||||
|
LOG.info("Actual path name: " + res1);
|
||||||
|
LOG.info("Cached path name: " + res2);
|
||||||
|
Assert.assertEquals(cachedFullPathNames.get(inode),
|
||||||
|
inode.getFullPathName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
|
||||||
|
FSNamesystem namesystem, String file) throws IOException{
|
||||||
|
long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
|
||||||
|
List<LocatedBlock> res1 = NameNodeRaidUtil.getBlockLocations(namesystem,
|
||||||
|
file, 0L, len, false, false).getLocatedBlocks();
|
||||||
|
List<LocatedBlock> res2 = cachedBlocks.get(file);
|
||||||
|
for (int i = 0; i < res1.size(); i++) {
|
||||||
|
LOG.info("Actual block: " + res1.get(i).getBlock());
|
||||||
|
LOG.info("Cached block: " + res2.get(i).getBlock());
|
||||||
|
Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<LocatedBlock> getCompanionBlocks(
|
||||||
|
FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
|
||||||
|
ExtendedBlock block) throws IOException {
|
||||||
|
INodeFile inode = blockManager.blocksMap.getINode(block
|
||||||
|
.getLocalBlock());
|
||||||
|
FileType type = policy.getFileType(inode.getFullPathName());
|
||||||
|
return policy.getCompanionBlocks(inode.getFullPathName(), type,
|
||||||
|
block.getLocalBlock());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
|
||||||
|
throws IOException {
|
||||||
|
long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
|
||||||
|
return NameNodeRaidUtil.getBlockLocations(namesystem,
|
||||||
|
file, 0, len, false, false).getLocatedBlocks();
|
||||||
}
|
}
|
||||||
// private Configuration conf = null;
|
|
||||||
// private MiniDFSCluster cluster = null;
|
|
||||||
// private FSNamesystem namesystem = null;
|
|
||||||
// private BlockPlacementPolicyRaid policy = null;
|
|
||||||
// private FileSystem fs = null;
|
|
||||||
// String[] rack1 = {"/rack1"};
|
|
||||||
// String[] rack2 = {"/rack2"};
|
|
||||||
// String[] host1 = {"host1.rack1.com"};
|
|
||||||
// String[] host2 = {"host2.rack2.com"};
|
|
||||||
// String xorPrefix = null;
|
|
||||||
// String raidTempPrefix = null;
|
|
||||||
// String raidrsTempPrefix = null;
|
|
||||||
// String raidrsHarTempPrefix = null;
|
|
||||||
//
|
|
||||||
// final static Log LOG =
|
|
||||||
// LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
|
|
||||||
//
|
|
||||||
// protected void setupCluster() throws IOException {
|
|
||||||
// conf = new Configuration();
|
|
||||||
// conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
|
|
||||||
// conf.set("dfs.replication.pending.timeout.sec", "2");
|
|
||||||
// conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
|
|
||||||
// conf.set("dfs.block.replicator.classname",
|
|
||||||
// "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid");
|
|
||||||
// conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
|
|
||||||
// conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
|
|
||||||
// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
|
|
||||||
// // start the cluster with one datanode first
|
|
||||||
// cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
|
|
||||||
// format(true).racks(rack1).hosts(host1).build();
|
|
||||||
// cluster.waitActive();
|
|
||||||
// namesystem = cluster.getNameNode().getNamesystem();
|
|
||||||
// Assert.assertTrue("BlockPlacementPolicy type is not correct.",
|
|
||||||
// namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid);
|
|
||||||
// policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator;
|
|
||||||
// fs = cluster.getFileSystem();
|
|
||||||
// xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
|
|
||||||
// raidTempPrefix = RaidNode.xorTempPrefix(conf);
|
|
||||||
// raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
|
|
||||||
// raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Test that the parity files will be placed at the good locations when we
|
|
||||||
// * create them.
|
|
||||||
// */
|
|
||||||
// @Test
|
|
||||||
// public void testChooseTargetForRaidFile() throws IOException {
|
|
||||||
// setupCluster();
|
|
||||||
// try {
|
|
||||||
// String src = "/dir/file";
|
|
||||||
// String parity = raidrsTempPrefix + src;
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
|
|
||||||
// DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
|
|
||||||
// refreshPolicy();
|
|
||||||
// setBlockPlacementPolicy(namesystem, policy);
|
|
||||||
// // start 3 more datanodes
|
|
||||||
// String[] racks = {"/rack2", "/rack2", "/rack2",
|
|
||||||
// "/rack2", "/rack2", "/rack2"};
|
|
||||||
// String[] hosts =
|
|
||||||
// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
|
|
||||||
// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
|
|
||||||
// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
|
|
||||||
// int numBlocks = 6;
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
|
|
||||||
// DFSTestUtil.waitReplication(fs, new Path(parity), (short)2);
|
|
||||||
// FileStatus srcStat = fs.getFileStatus(new Path(src));
|
|
||||||
// BlockLocation[] srcLoc =
|
|
||||||
// fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
|
|
||||||
// FileStatus parityStat = fs.getFileStatus(new Path(parity));
|
|
||||||
// BlockLocation[] parityLoc =
|
|
||||||
// fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
|
|
||||||
// int parityLen = RaidNode.rsParityLength(conf);
|
|
||||||
// for (int i = 0; i < numBlocks / parityLen; i++) {
|
|
||||||
// Set<String> locations = new HashSet<String>();
|
|
||||||
// for (int j = 0; j < srcLoc.length; j++) {
|
|
||||||
// String [] names = srcLoc[j].getNames();
|
|
||||||
// for (int k = 0; k < names.length; k++) {
|
|
||||||
// LOG.info("Source block location: " + names[k]);
|
|
||||||
// locations.add(names[k]);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// for (int j = 0 ; j < parityLen; j++) {
|
|
||||||
// String[] names = parityLoc[j + i * parityLen].getNames();
|
|
||||||
// for (int k = 0; k < names.length; k++) {
|
|
||||||
// LOG.info("Parity block location: " + names[k]);
|
|
||||||
// Assert.assertTrue(locations.add(names[k]));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } finally {
|
|
||||||
// if (cluster != null) {
|
|
||||||
// cluster.shutdown();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Test that the har parity files will be placed at the good locations when we
|
|
||||||
// * create them.
|
|
||||||
// */
|
|
||||||
// @Test
|
|
||||||
// public void testChooseTargetForHarRaidFile() throws IOException {
|
|
||||||
// setupCluster();
|
|
||||||
// try {
|
|
||||||
// String[] racks = {"/rack2", "/rack2", "/rack2",
|
|
||||||
// "/rack2", "/rack2", "/rack2"};
|
|
||||||
// String[] hosts =
|
|
||||||
// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
|
|
||||||
// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
|
|
||||||
// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
|
|
||||||
// String harParity = raidrsHarTempPrefix + "/dir/file";
|
|
||||||
// int numBlocks = 11;
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
|
|
||||||
// DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
|
|
||||||
// FileStatus stat = fs.getFileStatus(new Path(harParity));
|
|
||||||
// BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
|
|
||||||
// int rsParityLength = RaidNode.rsParityLength(conf);
|
|
||||||
// for (int i = 0; i < numBlocks - rsParityLength; i++) {
|
|
||||||
// Set<String> locations = new HashSet<String>();
|
|
||||||
// for (int j = 0; j < rsParityLength; j++) {
|
|
||||||
// for (int k = 0; k < loc[i + j].getNames().length; k++) {
|
|
||||||
// // verify that every adjacent 4 blocks are on differnt nodes
|
|
||||||
// String name = loc[i + j].getNames()[k];
|
|
||||||
// LOG.info("Har Raid block location: " + name);
|
|
||||||
// Assert.assertTrue(locations.add(name));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } finally {
|
|
||||||
// if (cluster != null) {
|
|
||||||
// cluster.shutdown();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Test BlockPlacementPolicyRaid.CachedLocatedBlocks
|
|
||||||
// * Verify that the results obtained from cache is the same as
|
|
||||||
// * the results obtained directly
|
|
||||||
// */
|
|
||||||
// @Test
|
|
||||||
// public void testCachedBlocks() throws IOException {
|
|
||||||
// setupCluster();
|
|
||||||
// try {
|
|
||||||
// String file1 = "/dir/file1";
|
|
||||||
// String file2 = "/dir/file2";
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
|
|
||||||
// // test blocks cache
|
|
||||||
// CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
|
|
||||||
// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
|
|
||||||
// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
|
|
||||||
// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
|
|
||||||
// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
|
|
||||||
// try {
|
|
||||||
// Thread.sleep(1200L);
|
|
||||||
// } catch (InterruptedException e) {
|
|
||||||
// }
|
|
||||||
// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
|
|
||||||
// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
|
|
||||||
// } finally {
|
|
||||||
// if (cluster != null) {
|
|
||||||
// cluster.shutdown();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Test BlockPlacementPolicyRaid.CachedFullPathNames
|
|
||||||
// * Verify that the results obtained from cache is the same as
|
|
||||||
// * the results obtained directly
|
|
||||||
// */
|
|
||||||
// @Test
|
|
||||||
// public void testCachedPathNames() throws IOException {
|
|
||||||
// setupCluster();
|
|
||||||
// try {
|
|
||||||
// String file1 = "/dir/file1";
|
|
||||||
// String file2 = "/dir/file2";
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
|
|
||||||
// // test full path cache
|
|
||||||
// CachedFullPathNames cachedFullPathNames =
|
|
||||||
// new CachedFullPathNames(namesystem);
|
|
||||||
// FSInodeInfo inode1 = null;
|
|
||||||
// FSInodeInfo inode2 = null;
|
|
||||||
// NameNodeRaidTestUtil.readLock(namesystem.dir);
|
|
||||||
// try {
|
|
||||||
// inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true);
|
|
||||||
// inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true);
|
|
||||||
// } finally {
|
|
||||||
// NameNodeRaidTestUtil.readUnLock(namesystem.dir);
|
|
||||||
// }
|
|
||||||
// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
|
|
||||||
// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
|
|
||||||
// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
|
|
||||||
// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
|
|
||||||
// try {
|
|
||||||
// Thread.sleep(1200L);
|
|
||||||
// } catch (InterruptedException e) {
|
|
||||||
// }
|
|
||||||
// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
|
|
||||||
// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
|
|
||||||
// } finally {
|
|
||||||
// if (cluster != null) {
|
|
||||||
// cluster.shutdown();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// /**
|
|
||||||
// * Test the result of getCompanionBlocks() on the unraided files
|
|
||||||
// */
|
|
||||||
// @Test
|
|
||||||
// public void testGetCompanionBLocks() throws IOException {
|
|
||||||
// setupCluster();
|
|
||||||
// try {
|
|
||||||
// String file1 = "/dir/file1";
|
|
||||||
// String file2 = "/raid/dir/file2";
|
|
||||||
// String file3 = "/raidrs/dir/file3";
|
|
||||||
// // Set the policy to default policy to place the block in the default way
|
|
||||||
// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
|
|
||||||
// conf, namesystem, namesystem.clusterMap));
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
|
|
||||||
// DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
|
|
||||||
// Collection<LocatedBlock> companionBlocks;
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
|
|
||||||
// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
|
|
||||||
// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
|
|
||||||
// Assert.assertEquals(1, companionBlocks.size());
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
|
|
||||||
// Assert.assertEquals(1, companionBlocks.size());
|
|
||||||
//
|
|
||||||
// int rsParityLength = RaidNode.rsParityLength(conf);
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
|
|
||||||
// Assert.assertEquals(rsParityLength, companionBlocks.size());
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
|
|
||||||
// Assert.assertEquals(rsParityLength, companionBlocks.size());
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
|
|
||||||
// Assert.assertEquals(2, companionBlocks.size());
|
|
||||||
// } finally {
|
|
||||||
// if (cluster != null) {
|
|
||||||
// cluster.shutdown();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// static void setBlockPlacementPolicy(
|
|
||||||
// FSNamesystem namesystem, BlockPlacementPolicy policy) {
|
|
||||||
// namesystem.writeLock();
|
|
||||||
// try {
|
|
||||||
// namesystem.blockManager.replicator = policy;
|
|
||||||
// } finally {
|
|
||||||
// namesystem.writeUnlock();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Test BlockPlacementPolicyRaid actually deletes the correct replica.
|
|
||||||
// * Start 2 datanodes and create 1 source file and its parity file.
|
|
||||||
// * 1) Start host1, create the parity file with replication 1
|
|
||||||
// * 2) Start host2, create the source file with replication 2
|
|
||||||
// * 3) Set repliation of source file to 1
|
|
||||||
// * Verify that the policy should delete the block with more companion blocks.
|
|
||||||
// */
|
|
||||||
// @Test
|
|
||||||
// public void testDeleteReplica() throws IOException {
|
|
||||||
// setupCluster();
|
|
||||||
// try {
|
|
||||||
// // Set the policy to default policy to place the block in the default way
|
|
||||||
// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
|
|
||||||
// conf, namesystem, namesystem.clusterMap));
|
|
||||||
// DatanodeDescriptor datanode1 =
|
|
||||||
// NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next();
|
|
||||||
// String source = "/dir/file";
|
|
||||||
// String parity = xorPrefix + source;
|
|
||||||
//
|
|
||||||
// final Path parityPath = new Path(parity);
|
|
||||||
// DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
|
|
||||||
// DFSTestUtil.waitReplication(fs, parityPath, (short)1);
|
|
||||||
//
|
|
||||||
// // start one more datanode
|
|
||||||
// cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
|
|
||||||
// DatanodeDescriptor datanode2 = null;
|
|
||||||
// for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) {
|
|
||||||
// if (!d.getName().equals(datanode1.getName())) {
|
|
||||||
// datanode2 = d;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// Assert.assertTrue(datanode2 != null);
|
|
||||||
// cluster.waitActive();
|
|
||||||
// final Path sourcePath = new Path(source);
|
|
||||||
// DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
|
|
||||||
// DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
|
|
||||||
//
|
|
||||||
// refreshPolicy();
|
|
||||||
// Assert.assertEquals(parity,
|
|
||||||
// policy.getParityFile(source));
|
|
||||||
// Assert.assertEquals(source,
|
|
||||||
// policy.getSourceFile(parity, xorPrefix));
|
|
||||||
//
|
|
||||||
// List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
|
|
||||||
// List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
|
|
||||||
// Assert.assertEquals(5, sourceBlocks.size());
|
|
||||||
// Assert.assertEquals(3, parityBlocks.size());
|
|
||||||
//
|
|
||||||
// // verify the result of getCompanionBlocks()
|
|
||||||
// Collection<LocatedBlock> companionBlocks;
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, sourceBlocks.get(0).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{0, 1}, new int[]{0});
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, sourceBlocks.get(1).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{0, 1}, new int[]{0});
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, sourceBlocks.get(2).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{2, 3}, new int[]{1});
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, sourceBlocks.get(3).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{2, 3}, new int[]{1});
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, sourceBlocks.get(4).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{4}, new int[]{2});
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, parityBlocks.get(0).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{0, 1}, new int[]{0});
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, parityBlocks.get(1).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{2, 3}, new int[]{1});
|
|
||||||
//
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, parityBlocks.get(2).getBlock());
|
|
||||||
// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
|
|
||||||
// new int[]{4}, new int[]{2});
|
|
||||||
//
|
|
||||||
// // Set the policy back to raid policy. We have to create a new object
|
|
||||||
// // here to clear the block location cache
|
|
||||||
// refreshPolicy();
|
|
||||||
// setBlockPlacementPolicy(namesystem, policy);
|
|
||||||
// // verify policy deletes the correct blocks. companion blocks should be
|
|
||||||
// // evenly distributed.
|
|
||||||
// fs.setReplication(sourcePath, (short)1);
|
|
||||||
// DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
|
|
||||||
// Map<String, Integer> counters = new HashMap<String, Integer>();
|
|
||||||
// refreshPolicy();
|
|
||||||
// for (int i = 0; i < parityBlocks.size(); i++) {
|
|
||||||
// companionBlocks = getCompanionBlocks(
|
|
||||||
// namesystem, policy, parityBlocks.get(i).getBlock());
|
|
||||||
//
|
|
||||||
// counters = BlockPlacementPolicyRaid.countCompanionBlocks(
|
|
||||||
// companionBlocks, false);
|
|
||||||
// Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
|
|
||||||
// counters.get(datanode1.getName()) <= 2);
|
|
||||||
// Assert.assertTrue(counters.get(datanode1.getName()) +
|
|
||||||
// counters.get(datanode2.getName()) ==
|
|
||||||
// companionBlocks.size());
|
|
||||||
//
|
|
||||||
// counters = BlockPlacementPolicyRaid.countCompanionBlocks(
|
|
||||||
// companionBlocks, true);
|
|
||||||
// Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
|
|
||||||
// counters.get(datanode1.getParent().getName()) <= 2);
|
|
||||||
// Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
|
|
||||||
// counters.get(datanode2.getParent().getName()) ==
|
|
||||||
// companionBlocks.size());
|
|
||||||
// }
|
|
||||||
// } finally {
|
|
||||||
// if (cluster != null) {
|
|
||||||
// cluster.shutdown();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // create a new BlockPlacementPolicyRaid to clear the cache
|
|
||||||
// private void refreshPolicy() {
|
|
||||||
// policy = new BlockPlacementPolicyRaid();
|
|
||||||
// policy.initialize(conf, namesystem, namesystem.clusterMap);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
|
|
||||||
// List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
|
|
||||||
// int[] sourceBlockIndexes, int[] parityBlockIndexes) {
|
|
||||||
// Set<ExtendedBlock> blockSet = new HashSet<ExtendedBlock>();
|
|
||||||
// for (LocatedBlock b : companionBlocks) {
|
|
||||||
// blockSet.add(b.getBlock());
|
|
||||||
// }
|
|
||||||
// Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
|
|
||||||
// blockSet.size());
|
|
||||||
// for (int index : sourceBlockIndexes) {
|
|
||||||
// Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
|
|
||||||
// }
|
|
||||||
// for (int index : parityBlockIndexes) {
|
|
||||||
// Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void verifyCachedFullPathNameResult(
|
|
||||||
// CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
|
|
||||||
// throws IOException {
|
|
||||||
// String res1 = inode.getFullPathName();
|
|
||||||
// String res2 = cachedFullPathNames.get(inode);
|
|
||||||
// LOG.info("Actual path name: " + res1);
|
|
||||||
// LOG.info("Cached path name: " + res2);
|
|
||||||
// Assert.assertEquals(cachedFullPathNames.get(inode),
|
|
||||||
// inode.getFullPathName());
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
|
|
||||||
// FSNamesystem namesystem, String file) throws IOException{
|
|
||||||
// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
|
|
||||||
// List<LocatedBlock> res1 = NameNodeRaidUtil.getBlockLocations(namesystem,
|
|
||||||
// file, 0L, len, false, false).getLocatedBlocks();
|
|
||||||
// List<LocatedBlock> res2 = cachedBlocks.get(file);
|
|
||||||
// for (int i = 0; i < res1.size(); i++) {
|
|
||||||
// LOG.info("Actual block: " + res1.get(i).getBlock());
|
|
||||||
// LOG.info("Cached block: " + res2.get(i).getBlock());
|
|
||||||
// Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private Collection<LocatedBlock> getCompanionBlocks(
|
|
||||||
// FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
|
|
||||||
// ExtendedBlock block) throws IOException {
|
|
||||||
// INodeFile inode = namesystem.blockManager.blocksMap.getINode(block
|
|
||||||
// .getLocalBlock());
|
|
||||||
// FileType type = policy.getFileType(inode.getFullPathName());
|
|
||||||
// return policy.getCompanionBlocks(inode.getFullPathName(), type,
|
|
||||||
// block.getLocalBlock());
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
|
|
||||||
// throws IOException {
|
|
||||||
// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
|
|
||||||
// return NameNodeRaidUtil.getBlockLocations(namesystem,
|
|
||||||
// file, 0, len, false, false).getLocatedBlocks();
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,33 +17,21 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.*;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.*;
|
|
||||||
import org.apache.hadoop.fs.*;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.*;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
|
||||||
|
|
||||||
public class NameNodeRaidTestUtil {
|
public class NameNodeRaidTestUtil {
|
||||||
public static void readLock(final FSDirectory dir) {
|
public static FSInodeInfo[] getFSInodeInfo(final FSNamesystem namesystem,
|
||||||
|
final String... files) throws UnresolvedLinkException {
|
||||||
|
final FSInodeInfo[] inodes = new FSInodeInfo[files.length];
|
||||||
|
final FSDirectory dir = namesystem.dir;
|
||||||
dir.readLock();
|
dir.readLock();
|
||||||
|
try {
|
||||||
|
for(int i = 0; i < files.length; i++) {
|
||||||
|
inodes[i] = dir.rootDir.getNode(files[i], true);
|
||||||
}
|
}
|
||||||
|
return inodes;
|
||||||
public static void readUnLock(final FSDirectory dir) {
|
} finally {
|
||||||
dir.readUnlock();
|
dir.readUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FSInodeInfo getNode(final FSDirectory dir,
|
|
||||||
final String src, final boolean resolveLink
|
|
||||||
) throws UnresolvedLinkException {
|
|
||||||
return dir.rootDir.getNode(src, resolveLink);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// public static NavigableMap<String, DatanodeDescriptor> getDatanodeMap(
|
|
||||||
// final FSNamesystem namesystem) {
|
|
||||||
// return namesystem.datanodeMap;
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue