HDFS-9142. Separating Configuration object for namenode(s) in MiniDFSCluster. (Siqi Li via mingma)

(cherry picked from commit de8efc65a4)
This commit is contained in:
Ming Ma 2015-10-09 11:10:46 -07:00
parent 61988f8016
commit 02380e0156
3 changed files with 103 additions and 24 deletions

View File

@ -1178,6 +1178,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9137. DeadLock between DataNode#refreshVolumes and HDFS-9137. DeadLock between DataNode#refreshVolumes and
BPOfferService#registrationSucceeded. (Uma Maheswara Rao G via yliu) BPOfferService#registrationSucceeded. (Uma Maheswara Rao G via yliu)
HDFS-9142. Separating Configuration object for namenode(s) in
MiniDFSCluster. (Siqi Li via mingma)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -40,6 +40,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
@ -850,6 +851,44 @@ public class MiniDFSCluster {
shutdown(); shutdown();
} }
} }
for (NameNodeInfo nn : nameNodes) {
Configuration nnConf = nn.conf;
for (NameNodeInfo nnInfo : nameNodes) {
if (nn.equals(nnInfo)) {
continue;
}
copyKeys(conf, nnConf, nnInfo.nameserviceId, nnInfo.nnId);
}
}
}
private static void copyKeys(Configuration srcConf, Configuration destConf,
String nameserviceId, String nnId) {
String key = DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nameserviceId, nnId);
destConf.set(key, srcConf.get(key));
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
nameserviceId, nnId);
String val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
nameserviceId, nnId);
val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
nameserviceId, nnId);
val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
} }
/** /**
@ -985,15 +1024,13 @@ public class MiniDFSCluster {
// Start all Namenodes // Start all Namenodes
for (NNConf nn : nameservice.getNNs()) { for (NNConf nn : nameservice.getNNs()) {
initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, Configuration hdfsConf = new Configuration(conf);
initNameNodeConf(hdfsConf, nsId, nn.getNnId(), manageNameDfsDirs,
enableManagedDfsDirsRedundancy, nnCounter); enableManagedDfsDirsRedundancy, nnCounter);
createNameNode(nnCounter, conf, numDataNodes, false, operation, createNameNode(nnCounter, hdfsConf, numDataNodes, false, operation,
clusterId, nsId, nn.getNnId()); clusterId, nsId, nn.getNnId());
// Record the last namenode uri // Record the last namenode uri
if (nameNodes[nnCounter] != null && nameNodes[nnCounter].conf != null) { lastDefaultFileSystem = hdfsConf.get(FS_DEFAULT_NAME_KEY);
lastDefaultFileSystem =
nameNodes[nnCounter].conf.get(FS_DEFAULT_NAME_KEY);
}
nnCounter++; nnCounter++;
} }
if (!federation && lastDefaultFileSystem != null) { if (!federation && lastDefaultFileSystem != null) {
@ -1100,50 +1137,43 @@ public class MiniDFSCluster {
return args; return args;
} }
private void createNameNode(int nnIndex, Configuration conf, private void createNameNode(int nnIndex, Configuration hdfsConf,
int numDataNodes, boolean format, StartupOption operation, int numDataNodes, boolean format, StartupOption operation,
String clusterId, String nameserviceId, String clusterId, String nameserviceId,
String nnId) String nnId)
throws IOException { throws IOException {
// Format and clean out DataNode directories // Format and clean out DataNode directories
if (format) { if (format) {
DFSTestUtil.formatNameNode(conf); DFSTestUtil.formatNameNode(hdfsConf);
} }
if (operation == StartupOption.UPGRADE){ if (operation == StartupOption.UPGRADE){
operation.setClusterId(clusterId); operation.setClusterId(clusterId);
} }
// Start the NameNode after saving the default file system. // Start the NameNode after saving the default file system.
String originalDefaultFs = conf.get(FS_DEFAULT_NAME_KEY);
String[] args = createArgs(operation); String[] args = createArgs(operation);
NameNode nn = NameNode.createNameNode(args, conf); NameNode nn = NameNode.createNameNode(args, hdfsConf);
if (operation == StartupOption.RECOVER) { if (operation == StartupOption.RECOVER) {
return; return;
} }
// After the NN has started, set back the bound ports into // After the NN has started, set back the bound ports into
// the conf // the conf
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nameserviceId, nnId), nn.getNameNodeAddressHostPortString()); nameserviceId, nnId), nn.getNameNodeAddressHostPortString());
if (nn.getHttpAddress() != null) { if (nn.getHttpAddress() != null) {
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress())); nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress()));
} }
if (nn.getHttpsAddress() != null) { if (nn.getHttpsAddress() != null) {
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpsAddress())); nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpsAddress()));
} }
copyKeys(hdfsConf, conf, nameserviceId, nnId);
DFSUtil.setGenericConf(conf, nameserviceId, nnId, DFSUtil.setGenericConf(hdfsConf, nameserviceId, nnId,
DFS_NAMENODE_HTTP_ADDRESS_KEY); DFS_NAMENODE_HTTP_ADDRESS_KEY);
nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
operation, new Configuration(conf)); operation, hdfsConf);
// Restore the default fs name
if (originalDefaultFs == null) {
conf.set(FS_DEFAULT_NAME_KEY, "");
} else {
conf.set(FS_DEFAULT_NAME_KEY, originalDefaultFs);
}
} }
/** /**
@ -2759,7 +2789,7 @@ public class MiniDFSCluster {
* *
* @return newly started namenode * @return newly started namenode
*/ */
public NameNode addNameNode(Configuration conf, int namenodePort) public void addNameNode(Configuration conf, int namenodePort)
throws IOException { throws IOException {
if(!federation) if(!federation)
throw new IOException("cannot add namenode to non-federated cluster"); throw new IOException("cannot add namenode to non-federated cluster");
@ -2790,7 +2820,6 @@ public class MiniDFSCluster {
// Wait for new namenode to get registrations from all the datanodes // Wait for new namenode to get registrations from all the datanodes
waitActive(nnIndex); waitActive(nnIndex);
return nameNodes[nnIndex].nameNode;
} }
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile, protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
@ -28,6 +29,7 @@ import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
@ -182,4 +184,49 @@ public class TestMiniDFSCluster {
MiniDFSCluster.shutdownCluster(cluster); MiniDFSCluster.shutdownCluster(cluster);
} }
} }
@Test
public void testSetUpFederatedCluster() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).nnTopology(
MiniDFSNNTopology.simpleHAFederatedTopology(2))
.numDataNodes(2)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(1);
cluster.transitionToActive(3);
assertEquals("standby", cluster.getNamesystem(0).getHAState());
assertEquals("active", cluster.getNamesystem(1).getHAState());
assertEquals("standby", cluster.getNamesystem(2).getHAState());
assertEquals("active", cluster.getNamesystem(3).getHAState());
String ns0nn0 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0"));
String ns0nn1 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1"));
String ns1nn0 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0"));
String ns1nn1 = conf.get(
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1"));
for(NameNodeInfo nnInfo : cluster.getNameNodeInfos()) {
assertEquals(ns0nn0, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0")));
assertEquals(ns0nn1, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1")));
assertEquals(ns1nn0, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0")));
assertEquals(ns1nn1, nnInfo.conf.get(
DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1")));
}
} finally {
MiniDFSCluster.shutdownCluster(cluster);
}
}
} }