HDFS-2805. Add a test for a federated cluster with HA NNs. Contributed by Brandon Li.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1236471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fdf7b18247
commit
e7775e0b3b
|
@ -133,3 +133,5 @@ HDFS-2807. Service level authorizartion for HAServiceProtocol. (jitendra)
|
||||||
HDFS-2809. Add test to verify that delegation tokens are honored after failover. (jitendra and atm)
|
HDFS-2809. Add test to verify that delegation tokens are honored after failover. (jitendra and atm)
|
||||||
|
|
||||||
HDFS-2838. NPE in FSNamesystem when in safe mode. (Gregory Chanan via eli)
|
HDFS-2838. NPE in FSNamesystem when in safe mode. (Gregory Chanan via eli)
|
||||||
|
|
||||||
|
HDFS-2805. Add a test for a federated cluster with HA NNs. (Brandon Li via jitendra)
|
||||||
|
|
|
@ -586,11 +586,19 @@ public class MiniDFSCluster {
|
||||||
conf.set(FS_DEFAULT_NAME_KEY, "127.0.0.1:" + onlyNN.getIpcPort());
|
conf.set(FS_DEFAULT_NAME_KEY, "127.0.0.1:" + onlyNN.getIpcPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we have more than one nameservice, need to enumerate them in the
|
||||||
|
// config.
|
||||||
|
if (federation) {
|
||||||
|
List<String> allNsIds = Lists.newArrayList();
|
||||||
|
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
||||||
|
allNsIds.add(nameservice.getId());
|
||||||
|
}
|
||||||
|
conf.set(DFS_FEDERATION_NAMESERVICES, Joiner.on(",").join(allNsIds));
|
||||||
|
}
|
||||||
|
|
||||||
int nnCounter = 0;
|
int nnCounter = 0;
|
||||||
List<String> nsIds = Lists.newArrayList();
|
|
||||||
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
||||||
String nsId = nameservice.getId();
|
String nsId = nameservice.getId();
|
||||||
nsIds.add(nameservice.getId());
|
|
||||||
|
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
!federation || nsId != null,
|
!federation || nsId != null,
|
||||||
|
@ -643,6 +651,7 @@ public class MiniDFSCluster {
|
||||||
}
|
}
|
||||||
prevNNDirs = FSNamesystem.getNamespaceDirs(conf);
|
prevNNDirs = FSNamesystem.getNamespaceDirs(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start all Namenodes
|
// Start all Namenodes
|
||||||
for (NNConf nn : nameservice.getNNs()) {
|
for (NNConf nn : nameservice.getNNs()) {
|
||||||
initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, nnCounter);
|
initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, nnCounter);
|
||||||
|
@ -651,12 +660,7 @@ public class MiniDFSCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
if (federation) {
|
|
||||||
// If we have more than one nameservice, need to enumerate them in the
|
|
||||||
// config.
|
|
||||||
conf.set(DFS_FEDERATION_NAMESERVICES, Joiner.on(",").join(nsIds));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public URI getSharedEditsDir(int minNN, int maxNN) throws IOException {
|
public URI getSharedEditsDir(int minNN, int maxNN) throws IOException {
|
||||||
|
|
|
@ -77,6 +77,22 @@ public class MiniDFSNNTopology {
|
||||||
return topology;
|
return topology;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up federated cluster with the given number of nameservices, each
|
||||||
|
* of which has two NameNodes.
|
||||||
|
*/
|
||||||
|
public static MiniDFSNNTopology simpleHAFederatedTopology(
|
||||||
|
int numNameservices) {
|
||||||
|
MiniDFSNNTopology topology = new MiniDFSNNTopology();
|
||||||
|
for (int i = 0; i < numNameservices; i++) {
|
||||||
|
topology.addNameservice(new MiniDFSNNTopology.NSConf("ns" + i)
|
||||||
|
.addNN(new MiniDFSNNTopology.NNConf("nn0"))
|
||||||
|
.addNN(new MiniDFSNNTopology.NNConf("nn1")));
|
||||||
|
}
|
||||||
|
topology.setFederation(true);
|
||||||
|
return topology;
|
||||||
|
}
|
||||||
|
|
||||||
public MiniDFSNNTopology setFederation(boolean federation) {
|
public MiniDFSNNTopology setFederation(boolean federation) {
|
||||||
this.federation = federation;
|
this.federation = federation;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -131,18 +131,36 @@ public abstract class HATestUtil {
|
||||||
/** Gets the filesystem instance by setting the failover configurations */
|
/** Gets the filesystem instance by setting the failover configurations */
|
||||||
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
|
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
|
return configureFailoverFs(cluster, conf, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the filesystem instance by setting the failover configurations
|
||||||
|
* @param cluster the single process DFS cluster
|
||||||
|
* @param conf cluster configuration
|
||||||
|
* @param nsIndex namespace index starting with zero
|
||||||
|
* @throws IOException if an error occurs rolling the edit log
|
||||||
|
*/
|
||||||
|
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
|
||||||
|
int nsIndex) throws IOException, URISyntaxException {
|
||||||
conf = new Configuration(conf);
|
conf = new Configuration(conf);
|
||||||
String logicalName = getLogicalHostname(cluster);
|
String logicalName = getLogicalHostname(cluster);
|
||||||
setFailoverConfigurations(cluster, conf, logicalName);
|
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
|
||||||
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
|
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
|
||||||
return fs;
|
return fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Sets the required configurations for performing failover */
|
/** Sets the required configurations for performing failover of default namespace. */
|
||||||
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
||||||
Configuration conf, String logicalName) {
|
Configuration conf, String logicalName) {
|
||||||
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
|
setFailoverConfigurations(cluster, conf, logicalName, 0);
|
||||||
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
|
}
|
||||||
|
|
||||||
|
/** Sets the required configurations for performing failover. */
|
||||||
|
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
||||||
|
Configuration conf, String logicalName, int nsIndex) {
|
||||||
|
InetSocketAddress nnAddr1 = cluster.getNameNode(2 * nsIndex).getNameNodeAddress();
|
||||||
|
InetSocketAddress nnAddr2 = cluster.getNameNode(2 * nsIndex + 1).getNameNodeAddress();
|
||||||
String nameNodeId1 = "nn1";
|
String nameNodeId1 = "nn1";
|
||||||
String nameNodeId2 = "nn2";
|
String nameNodeId2 = "nn2";
|
||||||
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
|
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
|
||||||
|
|
|
@ -106,6 +106,46 @@ public class TestHAStateTransitions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test manual failover failback for one namespace
|
||||||
|
* @param cluster single process test cluster
|
||||||
|
* @param conf cluster configuration
|
||||||
|
* @param nsIndex namespace index starting from zero
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private void testManualFailoverFailback(MiniDFSCluster cluster,
|
||||||
|
Configuration conf, int nsIndex) throws Exception {
|
||||||
|
int nn0 = 2 * nsIndex, nn1 = 2 * nsIndex + 1;
|
||||||
|
|
||||||
|
cluster.transitionToActive(nn0);
|
||||||
|
|
||||||
|
LOG.info("Starting with NN 0 active in namespace " + nsIndex);
|
||||||
|
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
|
fs.mkdirs(TEST_DIR);
|
||||||
|
|
||||||
|
LOG.info("Failing over to NN 1 in namespace " + nsIndex);
|
||||||
|
cluster.transitionToStandby(nn0);
|
||||||
|
cluster.transitionToActive(nn1);
|
||||||
|
assertTrue(fs.exists(TEST_DIR));
|
||||||
|
DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
|
||||||
|
|
||||||
|
LOG.info("Failing over to NN 0 in namespace " + nsIndex);
|
||||||
|
cluster.transitionToStandby(nn1);
|
||||||
|
cluster.transitionToActive(nn0);
|
||||||
|
assertTrue(fs.exists(TEST_DIR));
|
||||||
|
assertEquals(TEST_FILE_DATA,
|
||||||
|
DFSTestUtil.readFile(fs, TEST_FILE_PATH));
|
||||||
|
|
||||||
|
LOG.info("Removing test file");
|
||||||
|
fs.delete(TEST_DIR, true);
|
||||||
|
assertFalse(fs.exists(TEST_DIR));
|
||||||
|
|
||||||
|
LOG.info("Failing over to NN 1 in namespace " + nsIndex);
|
||||||
|
cluster.transitionToStandby(nn0);
|
||||||
|
cluster.transitionToActive(nn1);
|
||||||
|
assertFalse(fs.exists(TEST_DIR));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests manual failover back and forth between two NameNodes.
|
* Tests manual failover back and forth between two NameNodes.
|
||||||
*/
|
*/
|
||||||
|
@ -118,34 +158,8 @@ public class TestHAStateTransitions {
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
cluster.transitionToActive(0);
|
// test the only namespace
|
||||||
|
testManualFailoverFailback(cluster, conf, 0);
|
||||||
LOG.info("Starting with NN 0 active");
|
|
||||||
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
|
||||||
fs.mkdirs(TEST_DIR);
|
|
||||||
|
|
||||||
LOG.info("Failing over to NN 1");
|
|
||||||
cluster.transitionToStandby(0);
|
|
||||||
cluster.transitionToActive(1);
|
|
||||||
assertTrue(fs.exists(TEST_DIR));
|
|
||||||
DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
|
|
||||||
|
|
||||||
LOG.info("Failing over to NN 0");
|
|
||||||
cluster.transitionToStandby(1);
|
|
||||||
cluster.transitionToActive(0);
|
|
||||||
assertTrue(fs.exists(TEST_DIR));
|
|
||||||
assertEquals(TEST_FILE_DATA,
|
|
||||||
DFSTestUtil.readFile(fs, TEST_FILE_PATH));
|
|
||||||
|
|
||||||
LOG.info("Removing test file");
|
|
||||||
fs.delete(TEST_DIR, true);
|
|
||||||
assertFalse(fs.exists(TEST_DIR));
|
|
||||||
|
|
||||||
LOG.info("Failing over to NN 1");
|
|
||||||
cluster.transitionToStandby(0);
|
|
||||||
cluster.transitionToActive(1);
|
|
||||||
assertFalse(fs.exists(TEST_DIR));
|
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -294,4 +308,28 @@ public class TestHAStateTransitions {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests manual failover back and forth between two NameNodes
|
||||||
|
* for federation cluster with two namespaces.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testManualFailoverFailbackFederationHA() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
|
||||||
|
.numDataNodes(1)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// test for namespace 0
|
||||||
|
testManualFailoverFailback(cluster, conf, 0);
|
||||||
|
|
||||||
|
// test for namespace 1
|
||||||
|
testManualFailoverFailback(cluster, conf, 1);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue