HDFS-5742. Merging r1570067 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1570068 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-02-20 03:21:26 +00:00
parent 01730690f3
commit 10e98ce6bb
8 changed files with 57 additions and 42 deletions

View File

@ -203,6 +203,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5483. NN should gracefully handle multiple block replicas on same DN. HDFS-5483. NN should gracefully handle multiple block replicas on same DN.
(Arpit Agarwal) (Arpit Agarwal)
HDFS-5742. DatanodeCluster (mini cluster of DNs) fails to start.
(Arpit Agarwal)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -143,6 +143,7 @@ void format(FSNamesystem fsn, String clusterId) throws IOException {
"FSImage.format should be called with an uninitialized namesystem, has " + "FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files"); fileCount + " files");
NamespaceInfo ns = NNStorage.newNamespaceInfo(); NamespaceInfo ns = NNStorage.newNamespaceInfo();
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
ns.clusterID = clusterId; ns.clusterID = clusterId;
storage.format(ns); storage.format(ns);

View File

@ -68,9 +68,10 @@ public class DataNodeCluster {
static String dataNodeDirs = DATANODE_DIRS; static String dataNodeDirs = DATANODE_DIRS;
static final String USAGE = static final String USAGE =
"Usage: datanodecluster " + "Usage: datanodecluster " +
" -n <numDataNodes> " + " -n <numDataNodes> " +
" -bpid <bpid>" +
" [-racks <numRacks>] " + " [-racks <numRacks>] " +
" [-simulated] " + " [-simulated [<simulatedCapacityPerDn>]] " +
" [-inject startingBlockId numBlocksPerDN]" + " [-inject startingBlockId numBlocksPerDN]" +
" [-r replicationFactorForInjectedBlocks]" + " [-r replicationFactorForInjectedBlocks]" +
" [-d dataNodeDirs]\n" + " [-d dataNodeDirs]\n" +
@ -91,7 +92,7 @@ static void printUsageExit(String err) {
printUsageExit(); printUsageExit();
} }
public static void main(String[] args) { public static void main(String[] args) throws InterruptedException {
int numDataNodes = 0; int numDataNodes = 0;
int numRacks = 0; int numRacks = 0;
boolean inject = false; boolean inject = false;
@ -99,6 +100,8 @@ public static void main(String[] args) {
int numBlocksPerDNtoInject = 0; int numBlocksPerDNtoInject = 0;
int replication = 1; int replication = 1;
boolean checkDataNodeAddrConfig = false; boolean checkDataNodeAddrConfig = false;
long simulatedCapacityPerDn = SimulatedFSDataset.DEFAULT_CAPACITY;
String bpid = null;
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
@ -115,7 +118,7 @@ public static void main(String[] args) {
numRacks = Integer.parseInt(args[i]); numRacks = Integer.parseInt(args[i]);
} else if (args[i].equals("-r")) { } else if (args[i].equals("-r")) {
if (++i >= args.length || args[i].startsWith("-")) { if (++i >= args.length || args[i].startsWith("-")) {
printUsageExit("Missing replicaiton factor"); printUsageExit("Missing replication factor");
} }
replication = Integer.parseInt(args[i]); replication = Integer.parseInt(args[i]);
} else if (args[i].equals("-d")) { } else if (args[i].equals("-d")) {
@ -125,6 +128,14 @@ public static void main(String[] args) {
dataNodeDirs = args[i]; dataNodeDirs = args[i];
} else if (args[i].equals("-simulated")) { } else if (args[i].equals("-simulated")) {
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
if ((i+1) < args.length && !args[i+1].startsWith("-")) {
simulatedCapacityPerDn = Long.parseLong(args[++i]);
}
} else if (args[i].equals("-bpid")) {
if (++i >= args.length || args[i].startsWith("-")) {
printUsageExit("Missing blockpoolid parameter");
}
bpid = args[i];
} else if (args[i].equals("-inject")) { } else if (args[i].equals("-inject")) {
if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) { if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) {
System.out.print("-inject is valid only for simulated"); System.out.print("-inject is valid only for simulated");
@ -153,6 +164,9 @@ public static void main(String[] args) {
printUsageExit("Replication must be less than or equal to numDataNodes"); printUsageExit("Replication must be less than or equal to numDataNodes");
} }
if (bpid == null) {
printUsageExit("BlockPoolId must be provided");
}
String nameNodeAdr = FileSystem.getDefaultUri(conf).getAuthority(); String nameNodeAdr = FileSystem.getDefaultUri(conf).getAuthority();
if (nameNodeAdr == null) { if (nameNodeAdr == null) {
System.out.println("No name node address and port in config"); System.out.println("No name node address and port in config");
@ -162,9 +176,14 @@ public static void main(String[] args) {
System.out.println("Starting " + numDataNodes + System.out.println("Starting " + numDataNodes +
(simulated ? " Simulated " : " ") + (simulated ? " Simulated " : " ") +
" Data Nodes that will connect to Name Node at " + nameNodeAdr); " Data Nodes that will connect to Name Node at " + nameNodeAdr);
System.setProperty("test.build.data", dataNodeDirs); System.setProperty("test.build.data", dataNodeDirs);
long simulatedCapacities[] = new long[numDataNodes];
for (int i = 0; i < numDataNodes; ++i) {
simulatedCapacities[i] = simulatedCapacityPerDn;
}
MiniDFSCluster mc = new MiniDFSCluster(); MiniDFSCluster mc = new MiniDFSCluster();
try { try {
mc.formatDataNodeDirs(); mc.formatDataNodeDirs();
@ -182,13 +201,12 @@ public static void main(String[] args) {
//rack4DataNode[i] = racks[i%numRacks]; //rack4DataNode[i] = racks[i%numRacks];
rack4DataNode[i] = rackPrefix + "-" + i%numRacks; rack4DataNode[i] = rackPrefix + "-" + i%numRacks;
System.out.println("Data Node " + i + " using " + rack4DataNode[i]); System.out.println("Data Node " + i + " using " + rack4DataNode[i]);
} }
} }
try { try {
mc.startDataNodes(conf, numDataNodes, true, StartupOption.REGULAR, mc.startDataNodes(conf, numDataNodes, true, StartupOption.REGULAR,
rack4DataNode, null, null, false, checkDataNodeAddrConfig); rack4DataNode, null, simulatedCapacities, false, checkDataNodeAddrConfig);
Thread.sleep(10*1000); // Give the DN some time to connect to NN and init storage directories.
if (inject) { if (inject) {
long blockSize = 10; long blockSize = 10;
System.out.println("Injecting " + numBlocksPerDNtoInject + System.out.println("Injecting " + numBlocksPerDNtoInject +
@ -203,7 +221,7 @@ public static void main(String[] args) {
} }
for (int i = 1; i <= replication; ++i) { for (int i = 1; i <= replication; ++i) {
// inject blocks for dn_i into dn_i and replica in dn_i's neighbors // inject blocks for dn_i into dn_i and replica in dn_i's neighbors
mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks)); mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks), bpid);
System.out.println("Injecting blocks of dn " + i_dn + " into dn" + System.out.println("Injecting blocks of dn " + i_dn + " into dn" +
((i_dn + i- 1)% numDataNodes)); ((i_dn + i- 1)% numDataNodes));
} }

View File

@ -2045,17 +2045,19 @@ public List<Map<DatanodeStorage, BlockListAsLongs>> getAllBlockReports(String bp
return result; return result;
} }
/** /**
* This method is valid only if the data nodes have simulated data * This method is valid only if the data nodes have simulated data
* @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes()
* @param blocksToInject - the blocks * @param blocksToInject - the blocks
* @param bpid - (optional) the block pool id to use for injecting blocks.
* If not supplied then it is queried from the in-process NameNode.
* @throws IOException * @throws IOException
* if not simulatedFSDataset * if not simulatedFSDataset
* if any of blocks already exist in the data node * if any of blocks already exist in the data node
* *
*/ */
public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { public void injectBlocks(int dataNodeIndex,
Iterable<Block> blocksToInject, String bpid) throws IOException {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
@ -2064,7 +2066,9 @@ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) thro
if (!(dataSet instanceof SimulatedFSDataset)) { if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
} }
String bpid = getNamesystem().getBlockPoolId(); if (bpid == null) {
bpid = getNamesystem().getBlockPoolId();
}
SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
sdataset.injectBlocks(bpid, blocksToInject); sdataset.injectBlocks(bpid, blocksToInject);
dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
@ -2089,25 +2093,6 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
} }
/**
* This method is valid only if the data nodes have simulated data
* @param blocksToInject - blocksToInject[] is indexed in the same order as the list
* of datanodes returned by getDataNodes()
* @throws IOException
* if not simulatedFSDataset
* if any of blocks already exist in the data nodes
* Note the rest of the blocks are not injected.
*/
public void injectBlocks(Iterable<Block>[] blocksToInject)
throws IOException {
if (blocksToInject.length > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
for (int i = 0; i < blocksToInject.length; ++i) {
injectBlocks(i, blocksToInject[i]);
}
}
/** /**
* Set the softLimit and hardLimit of client lease periods * Set the softLimit and hardLimit of client lease periods
*/ */
@ -2154,11 +2139,13 @@ public String getDataDirectory() {
* @return the base directory for this instance. * @return the base directory for this instance.
*/ */
protected String determineDfsBaseDir() { protected String determineDfsBaseDir() {
String dfsdir = conf.get(HDFS_MINIDFS_BASEDIR, null); if (conf != null) {
if (dfsdir == null) { final String dfsdir = conf.get(HDFS_MINIDFS_BASEDIR, null);
dfsdir = getBaseDirectory(); if (dfsdir != null) {
return dfsdir;
}
} }
return dfsdir; return getBaseDirectory();
} }
/** /**

View File

@ -186,7 +186,7 @@ public void testInjection() throws IOException {
// Insert all the blocks in the first data node // Insert all the blocks in the first data node
LOG.info("Inserting " + uniqueBlocks.size() + " blocks"); LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
cluster.injectBlocks(0, uniqueBlocks); cluster.injectBlocks(0, uniqueBlocks, null);
dfsClient = new DFSClient(new InetSocketAddress("localhost", dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getNameNodePort()),

View File

@ -209,7 +209,7 @@ private void testUnevenDistribution(Configuration conf,
ClientProtocol.class).getProxy(); ClientProtocol.class).getProxy();
for(int i = 0; i < blocksDN.length; i++) for(int i = 0; i < blocksDN.length; i++)
cluster.injectBlocks(i, Arrays.asList(blocksDN[i])); cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
final long totalCapacity = sum(capacities); final long totalCapacity = sum(capacities);
runBalancer(conf, totalUsedSpace, totalCapacity); runBalancer(conf, totalUsedSpace, totalCapacity);

View File

@ -141,7 +141,7 @@ public void setup() throws IOException, InterruptedException {
validateNumberReplicas(1); validateNumberReplicas(1);
// Inject the block into the datanode with READ_ONLY_SHARED storage // Inject the block into the datanode with READ_ONLY_SHARED storage
cluster.injectBlocks(RO_NODE_INDEX, Collections.singleton(block)); cluster.injectBlocks(0, RO_NODE_INDEX, Collections.singleton(block));
// There should now be 2 *locations* for the block // There should now be 2 *locations* for the block
// Must wait until the NameNode has processed the block report for the injected blocks // Must wait until the NameNode has processed the block report for the injected blocks

View File

@ -57,7 +57,7 @@ public class CreateEditsLog {
GenerationStamp.LAST_RESERVED_STAMP; GenerationStamp.LAST_RESERVED_STAMP;
static void addFiles(FSEditLog editLog, int numFiles, short replication, static void addFiles(FSEditLog editLog, int numFiles, short replication,
int blocksPerFile, long startingBlockId, int blocksPerFile, long startingBlockId, long blockSize,
FileNameGenerator nameGenerator) { FileNameGenerator nameGenerator) {
PermissionStatus p = new PermissionStatus("joeDoe", "people", PermissionStatus p = new PermissionStatus("joeDoe", "people",
@ -66,7 +66,6 @@ static void addFiles(FSEditLog editLog, int numFiles, short replication,
INodeDirectory dirInode = new INodeDirectory(inodeId.nextValue(), null, p, INodeDirectory dirInode = new INodeDirectory(inodeId.nextValue(), null, p,
0L); 0L);
editLog.logMkDir(BASE_PATH, dirInode); editLog.logMkDir(BASE_PATH, dirInode);
long blockSize = 10;
BlockInfo[] blocks = new BlockInfo[blocksPerFile]; BlockInfo[] blocks = new BlockInfo[blocksPerFile];
for (int iB = 0; iB < blocksPerFile; ++iB) { for (int iB = 0; iB < blocksPerFile; ++iB) {
blocks[iB] = blocks[iB] =
@ -144,6 +143,7 @@ public static void main(String[] args)
int numFiles = 0; int numFiles = 0;
short replication = 1; short replication = 1;
int numBlocksPerFile = 0; int numBlocksPerFile = 0;
long blockSize = 10;
if (args.length == 0) { if (args.length == 0) {
printUsageExit(); printUsageExit();
@ -164,10 +164,16 @@ public static void main(String[] args)
if (numFiles <=0 || numBlocksPerFile <= 0) { if (numFiles <=0 || numBlocksPerFile <= 0) {
printUsageExit("numFiles and numBlocksPerFile most be greater than 0"); printUsageExit("numFiles and numBlocksPerFile most be greater than 0");
} }
} else if (args[i].equals("-l")) {
if (i + 1 >= args.length) {
printUsageExit(
"Missing block length");
}
blockSize = Long.parseLong(args[++i]);
} else if (args[i].equals("-r") || args[i+1].startsWith("-")) { } else if (args[i].equals("-r") || args[i+1].startsWith("-")) {
if (i + 1 >= args.length) { if (i + 1 >= args.length) {
printUsageExit( printUsageExit(
"Missing num files, starting block and/or number of blocks"); "Missing replication factor");
} }
replication = Short.parseShort(args[++i]); replication = Short.parseShort(args[++i]);
} else if (args[i].equals("-d")) { } else if (args[i].equals("-d")) {
@ -202,7 +208,7 @@ public static void main(String[] args)
FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir); FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir);
editLog.openForWrite(); editLog.openForWrite();
addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId, addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId,
nameGenerator); blockSize, nameGenerator);
editLog.logSync(); editLog.logSync();
editLog.close(); editLog.close();
} }