HDFS-2591. MiniDFSCluster support to mix and match federation with HA. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1208297 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-30 06:49:30 +00:00
parent ee97365f58
commit bb91ebb53c
17 changed files with 449 additions and 168 deletions

View File

@ -25,3 +25,5 @@ HDFS-2523. Small NN fixes to include HAServiceProtocol and prevent NPE on shutdo
HDFS-2577. NN fails to start since it tries to start secret manager in safemode. (todd)
HDFS-2582. Scope dfs.ha.namenodes config by nameservice (todd)
HDFS-2591. MiniDFSCluster support to mix and match federation with HA (todd)

View File

@ -571,6 +571,12 @@ public String getNamenodeId() {
public InetSocketAddress getAddress() {
return addr;
}
@Override
public String toString() {
return "ConfiguredNNAddress[nsId=" + nameserviceId + ";" +
"nnId=" + namenodeId + ";addr=" + addr + "]";
}
}
/**

View File

@ -32,6 +32,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -41,7 +42,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -65,6 +72,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
@ -77,6 +85,11 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
/**
* This class creates a single-process DFS cluster for junit testing.
* The data directories for non-simulated DFS are under the testing directory.
@ -102,7 +115,6 @@ public static class Builder {
private int nameNodePort = 0;
private int nameNodeHttpPort = 0;
private final Configuration conf;
private int numNameNodes = 1;
private int numDataNodes = 1;
private boolean format = true;
private boolean manageNameDfsDirs = true;
@ -114,21 +126,12 @@ public static class Builder {
private String clusterId = null;
private boolean waitSafeMode = true;
private boolean setupHostsFile = false;
private boolean federation = false;
private MiniDFSNNTopology nnTopology = null;
public Builder(Configuration conf) {
this.conf = conf;
}
/**
* default false - non federated cluster
* @param val
* @return Builder object
*/
public Builder federation (boolean val){
this.federation = val;
return this;
}
/**
* Default: 0
*/
@ -145,14 +148,6 @@ public Builder nameNodeHttpPort(int val) {
return this;
}
/**
* Default: 1
*/
public Builder numNameNodes(int val) {
this.numNameNodes = val;
return this;
}
/**
* Default: 1
*/
@ -242,6 +237,16 @@ public Builder setupHostsFile(boolean val) {
return this;
}
/**
* Default: a single namenode.
* See {@link MiniDFSNNTopology#simpleFederatedTopology(int)} to set up
* federated nameservices
*/
public Builder nnTopology(MiniDFSNNTopology topology) {
this.nnTopology = topology;
return this;
}
/**
* Construct the actual MiniDFSCluster
*/
@ -254,15 +259,17 @@ public MiniDFSCluster build() throws IOException {
* Used by builder to create and return an instance of MiniDFSCluster
*/
private MiniDFSCluster(Builder builder) throws IOException {
LOG.info("starting cluster with " + builder.numNameNodes + " namenodes.");
nameNodes = new NameNodeInfo[builder.numNameNodes];
// try to determine if in federation mode
if(builder.numNameNodes > 1)
builder.federation = true;
if (builder.nnTopology == null) {
// If no topology is specified, build a single NN.
builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
builder.nameNodePort, builder.nameNodeHttpPort);
}
LOG.info("starting cluster with " +
builder.nnTopology.countNameNodes() + " namenodes.");
nameNodes = new NameNodeInfo[builder.nnTopology.countNameNodes()];
initMiniDFSCluster(builder.nameNodePort,
builder.nameNodeHttpPort,
builder.conf,
initMiniDFSCluster(builder.conf,
builder.numDataNodes,
builder.format,
builder.manageNameDfsDirs,
@ -274,7 +281,7 @@ private MiniDFSCluster(Builder builder) throws IOException {
builder.clusterId,
builder.waitSafeMode,
builder.setupHostsFile,
builder.federation);
builder.nnTopology);
}
public class DataNodeProperties {
@ -296,8 +303,8 @@ public class DataNodeProperties {
new ArrayList<DataNodeProperties>();
private File base_dir;
private File data_dir;
private boolean federation = false;
private boolean waitSafeMode = true;
private boolean federation;
/**
* Stores the information related to a namenode in the cluster
@ -488,22 +495,23 @@ public MiniDFSCluster(int nameNodePort,
String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(nameNodePort, 0, conf, numDataNodes, format,
initMiniDFSCluster(conf, numDataNodes, format,
manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
simulatedCapacities, null, true, false, false);
simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0));
}
private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort,
private void initMiniDFSCluster(
Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs,
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile, boolean federation)
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology)
throws IOException {
this.conf = conf;
base_dir = new File(determineDfsBaseDir());
data_dir = new File(base_dir, "data");
this.federation = federation;
this.waitSafeMode = waitSafeMode;
// use alternate RPC engine if spec'd
@ -538,28 +546,9 @@ private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort,
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
Collection<String> nameserviceIds = DFSUtil.getNameServiceIds(conf);
if(nameserviceIds.size() > 1)
federation = true;
if (!federation) {
conf.set(FS_DEFAULT_NAME_KEY, "127.0.0.1:" + nameNodePort);
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:"
+ nameNodeHttpPort);
NameNode nn = createNameNode(0, conf, numDataNodes, manageNameDfsDirs,
format, operation, clusterId);
nameNodes[0] = new NameNodeInfo(nn, conf);
FileSystem.setDefaultUri(conf, getURI(0));
} else {
if (nameserviceIds.isEmpty()) {
for (int i = 0; i < nameNodes.length; i++) {
nameserviceIds.add(NAMESERVICE_ID_PREFIX + i);
}
}
initFederationConf(conf, nameserviceIds, numDataNodes, nameNodePort);
createFederationNamenodes(conf, nameserviceIds, manageNameDfsDirs, format,
operation, clusterId);
}
federation = nnTopology.isFederated();
createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, format, operation, clusterId, conf);
if (format) {
if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
@ -575,51 +564,91 @@ private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort,
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
/** Initialize configuration for federated cluster */
private static void initFederationConf(Configuration conf,
Collection<String> nameserviceIds, int numDataNodes, int nnPort) {
String nameserviceIdList = "";
for (String nameserviceId : nameserviceIds) {
// Create comma separated list of nameserviceIds
if (nameserviceIdList.length() > 0) {
nameserviceIdList += ",";
}
nameserviceIdList += nameserviceId;
initFederatedNamenodeAddress(conf, nameserviceId, nnPort);
nnPort = nnPort == 0 ? 0 : nnPort + 2;
private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
boolean manageNameDfsDirs, boolean format, StartupOption operation,
String clusterId,
Configuration conf) throws IOException {
Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
"empty NN topology: no namenodes specified!");
if (!federation && nnTopology.countNameNodes() == 1) {
NNConf onlyNN = nnTopology.getOnlyNameNode();
// we only had one NN, set DEFAULT_NAME for it
conf.set(FS_DEFAULT_NAME_KEY, "127.0.0.1:" + onlyNN.getIpcPort());
}
conf.set(DFS_FEDERATION_NAMESERVICES, nameserviceIdList);
}
/* For federated namenode initialize the address:port */
private static void initFederatedNamenodeAddress(Configuration conf,
String nameserviceId, int nnPort) {
// Set nameserviceId specific key
String key = DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId);
conf.set(key, "127.0.0.1:0");
key = DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId);
conf.set(key, "127.0.0.1:" + nnPort);
}
private void createFederationNamenodes(Configuration conf,
Collection<String> nameserviceIds, boolean manageNameDfsDirs,
boolean format, StartupOption operation, String clusterId)
throws IOException {
// Create namenodes in the cluster
int nnCounter = 0;
for (String nameserviceId : nameserviceIds) {
createFederatedNameNode(nnCounter++, conf, numDataNodes, manageNameDfsDirs,
format, operation, clusterId, nameserviceId);
List<String> nsIds = Lists.newArrayList();
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
String nsId = nameservice.getId();
nsIds.add(nameservice.getId());
Preconditions.checkArgument(
!federation || nsId != null,
"if there is more than one NS, they must have names");
// First set up the configuration which all of the NNs
// need to have - have to do this a priori before starting
// *any* of the NNs, so they know to come up in standby.
List<String> nnIds = Lists.newArrayList();
// Iterate over the NNs in this nameservice
for (NNConf nn : nameservice.getNNs()) {
nnIds.add(nn.getNnId());
initNameNodeAddress(conf, nameservice.getId(), nn);
}
// If HA is enabled on this nameservice, enumerate all the namenodes
// in the configuration. Also need to set a shared edits dir
if (nnIds.size() > 1) {
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nameservice.getId()),
Joiner.on(",").join(nnIds));
if (manageNameDfsDirs) {
URI sharedEditsUri = fileAsURI(new File(base_dir, "shared-edits-" +
nnCounter + "-through-" + (nnCounter+nnIds.size()-1)));
// TODO in HDFS-1971: conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
}
}
// Now start all the NNs in this nameservice.
int i = 0;
for (NNConf nn : nameservice.getNNs()) {
initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, nnCounter);
boolean formatThisOne = format;
if (format && i++ > 0) {
// Don't format the second NN in an HA setup - that
// would result in it having a different clusterID,
// block pool ID, etc. Instead, copy the name dirs
// from the first one.
formatThisOne = false;
copyNameDirs(getConfiguration(nnCounter - 1), conf);
}
createNameNode(nnCounter++, conf, numDataNodes, formatThisOne,
operation, clusterId, nsId, nn.getNnId());
}
}
if (federation) {
// If we have more than one nameservice, need to enumerate them in the
// config.
conf.set(DFS_FEDERATION_NAMESERVICES, Joiner.on(",").join(nsIds));
}
}
private NameNode createNameNode(int nnIndex, Configuration conf,
int numDataNodes, boolean manageNameDfsDirs, boolean format,
StartupOption operation, String clusterId)
private void initNameNodeConf(Configuration conf,
String nameserviceId, String nnId,
boolean manageNameDfsDirs, int nnIndex)
throws IOException {
if (nameserviceId != null) {
conf.set(DFS_FEDERATION_NAMESERVICE_ID, nameserviceId);
}
if (nnId != null) {
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
}
if (manageNameDfsDirs) {
conf.set(DFS_NAMENODE_NAME_DIR_KEY,
fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
@ -628,7 +657,49 @@ private NameNode createNameNode(int nnIndex, Configuration conf,
fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
}
}
private void copyNameDirs(Configuration srcConf, Configuration dstConf)
throws IOException {
Collection<URI> srcDirs = FSNamesystem.getNamespaceDirs(srcConf);
Collection<URI> dstDirs = FSNamesystem.getNamespaceDirs(dstConf);
URI srcDir = Lists.newArrayList(srcDirs).get(0);
FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();
for (URI dstDir : dstDirs) {
Preconditions.checkArgument(!dstDir.equals(srcDir));
Files.deleteRecursively(new File(dstDir));
LOG.info("Copying namedir from primary node dir "
+ srcDir + " to " + dstDir);
FileUtil.copy(
new File(srcDir),
dstFS, new Path(dstDir), false, dstConf);
}
}
/**
* Initialize the address and port for this NameNode. In the
* non-federated case, the nameservice and namenode ID may be
* null.
*/
private static void initNameNodeAddress(Configuration conf,
String nameserviceId, NNConf nnConf) {
// Set NN-specific specific key
String key = DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId,
nnConf.getNnId());
conf.set(key, "127.0.0.1:" + nnConf.getHttpPort());
key = DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId,
nnConf.getNnId());
conf.set(key, "127.0.0.1:" + nnConf.getIpcPort());
}
private void createNameNode(int nnIndex, Configuration conf,
int numDataNodes, boolean format, StartupOption operation,
String clusterId, String nameserviceId,
String nnId)
throws IOException {
// Format and clean out DataNode directories
if (format) {
DFSTestUtil.formatNameNode(conf);
@ -642,23 +713,17 @@ private NameNode createNameNode(int nnIndex, Configuration conf,
operation == StartupOption.FORMAT ||
operation == StartupOption.REGULAR) ?
new String[] {} : new String[] {operation.getName()};
return NameNode.createNameNode(args, conf);
}
private void createFederatedNameNode(int nnIndex, Configuration conf,
int numDataNodes, boolean manageNameDfsDirs, boolean format,
StartupOption operation, String clusterId, String nameserviceId)
throws IOException {
conf.set(DFS_FEDERATION_NAMESERVICE_ID, nameserviceId);
NameNode nn = createNameNode(nnIndex, conf, numDataNodes, manageNameDfsDirs,
format, operation, clusterId);
NameNode nn = NameNode.createNameNode(args, conf);
// After the NN has started, set back the bound ports into
// the conf
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId), NameNode
DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId, nnId), NameNode
.getHostPortString(nn.getNameNodeAddress()));
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId), NameNode
DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId, nnId), NameNode
.getHostPortString(nn.getHttpAddress()));
DFSUtil.setGenericConf(conf, nameserviceId,
DFSUtil.setGenericConf(conf, nameserviceId, nnId,
DFS_NAMENODE_HTTP_ADDRESS_KEY);
nameNodes[nnIndex] = new NameNodeInfo(nn, new Configuration(conf));
}
@ -1110,6 +1175,7 @@ public void shutdown() {
LOG.info("Shutting down the Mini HDFS Cluster");
shutdownDataNodes();
for (NameNodeInfo nnInfo : nameNodes) {
if (nnInfo == null) continue;
NameNode nameNode = nnInfo.nameNode;
if (nameNode != null) {
nameNode.stop();
@ -1380,14 +1446,7 @@ public boolean isNameNodeUp(int nnIndex) {
return false;
}
long[] sizes;
try {
sizes = nameNode.getRpcServer().getStats();
} catch (IOException ioe) {
// This method above should never throw.
// It only throws IOE since it is exposed via RPC
throw (AssertionError)(new AssertionError("Unexpected IOE thrown: "
+ StringUtils.stringifyException(ioe)).initCause(ioe));
}
sizes = NameNodeAdapter.getStats(nameNode.getNamesystem());
boolean isUp = false;
synchronized (this) {
isUp = ((!nameNode.isInSafeMode() || !waitSafeMode) && sizes[0] != 0);
@ -1497,6 +1556,22 @@ public Collection<URI> getNameDirs(int nnIndex) {
public Collection<URI> getNameEditsDirs(int nnIndex) {
return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
}
private HAServiceProtocol getHaServiceClient(int nnIndex) throws IOException {
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
return RPC.getProxy(HAServiceProtocol.class,
HAServiceProtocol.versionID, addr, conf);
}
public void transitionToActive(int nnIndex) throws IOException,
ServiceFailedException {
getHaServiceClient(nnIndex).transitionToActive();
}
public void transitionToStandby(int nnIndex) throws IOException,
ServiceFailedException {
getHaServiceClient(nnIndex).transitionToActive();
}
/** Wait until the given namenode gets registration from all the datanodes */
public void waitActive(int nnIndex) throws IOException {
@ -1504,6 +1579,7 @@ public void waitActive(int nnIndex) throws IOException {
return;
}
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
assert addr.getPort() != 0;
DFSClient client = new DFSClient(addr, conf);
// ensure all datanodes have registered and sent heartbeat to the namenode
@ -1902,7 +1978,7 @@ public NameNode addNameNode(Configuration conf, int namenodePort)
throws IOException {
if(!federation)
throw new IOException("cannot add namenode to non-federated cluster");
int nnIndex = nameNodes.length;
int numNameNodes = nameNodes.length + 1;
NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
@ -1913,10 +1989,13 @@ public NameNode addNameNode(Configuration conf, int namenodePort)
String nameserviceIds = conf.get(DFS_FEDERATION_NAMESERVICES);
nameserviceIds += "," + nameserviceId;
conf.set(DFS_FEDERATION_NAMESERVICES, nameserviceIds);
initFederatedNamenodeAddress(conf, nameserviceId, namenodePort);
createFederatedNameNode(nnIndex, conf, numDataNodes, true, true, null,
null, nameserviceId);
String nnId = null;
initNameNodeAddress(conf, nameserviceId,
new NNConf(nnId).setIpcPort(namenodePort));
initNameNodeConf(conf, nameserviceId, nnId, true, nnIndex);
createNameNode(nnIndex, conf, numDataNodes, true, null, null,
nameserviceId, nnId);
// Refresh datanodes with the newly started namenode
for (DataNodeProperties dn : dataNodes) {

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* This class is used to specify the setup of namenodes when instantiating
* a MiniDFSCluster. It consists of a set of nameservices, each of which
* may have one or more namenodes (in the case of HA)
*/
@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "Hive", "MapReduce", "Pig"})
@InterfaceStability.Unstable
public class MiniDFSNNTopology {
private final List<NSConf> nameservices = Lists.newArrayList();
private boolean federation;
public MiniDFSNNTopology() {
}
/**
* Set up a simple non-federated non-HA NN.
*/
public static MiniDFSNNTopology simpleSingleNN(
int nameNodePort, int nameNodeHttpPort) {
return new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(null)
.addNN(new MiniDFSNNTopology.NNConf(null)
.setHttpPort(nameNodeHttpPort)
.setIpcPort(nameNodePort)));
}
/**
* Set up federated cluster with the given number of nameservices, each
* of which has only a single NameNode.
*/
public static MiniDFSNNTopology simpleFederatedTopology(
int numNameservices) {
MiniDFSNNTopology topology = new MiniDFSNNTopology();
for (int i = 1; i <= numNameservices; i++) {
topology.addNameservice(new MiniDFSNNTopology.NSConf("ns" + i)
.addNN(new MiniDFSNNTopology.NNConf(null)));
}
topology.setFederation(true);
return topology;
}
public MiniDFSNNTopology setFederation(boolean federation) {
this.federation = federation;
return this;
}
public MiniDFSNNTopology addNameservice(NSConf nameservice) {
Preconditions.checkArgument(!nameservice.getNNs().isEmpty(),
"Must have at least one NN in a nameservice");
this.nameservices.add(nameservice);
return this;
}
public int countNameNodes() {
int count = 0;
for (NSConf ns : nameservices) {
count += ns.nns.size();
}
return count;
}
public NNConf getOnlyNameNode() {
Preconditions.checkState(countNameNodes() == 1,
"must have exactly one NN!");
return nameservices.get(0).getNNs().get(0);
}
public boolean isFederated() {
return nameservices.size() > 1 || federation;
}
public List<NSConf> getNameservices() {
return nameservices;
}
public static class NSConf {
private final String id;
private final List<NNConf> nns = Lists.newArrayList();
public NSConf(String id) {
this.id = id;
}
public NSConf addNN(NNConf nn) {
this.nns.add(nn);
return this;
}
public String getId() {
return id;
}
public List<NNConf> getNNs() {
return nns;
}
}
public static class NNConf {
private String nnId;
private int httpPort;
private int ipcPort;
public NNConf(String nnId) {
this.nnId = nnId;
}
String getNnId() {
return nnId;
}
int getIpcPort() {
return ipcPort;
}
int getHttpPort() {
return httpPort;
}
public NNConf setHttpPort(int httpPort) {
this.httpPort = httpPort;
return this;
}
public NNConf setIpcPort(int ipcPort) {
this.ipcPort = ipcPort;
return this;
}
}
}

View File

@ -46,7 +46,9 @@ public class TestDFSClientFailover {
@Before
public void setUpCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2).build();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build();
cluster.waitActive();
}
@ -61,7 +63,6 @@ public void tearDownCluster() throws IOException {
// changed to exercise that.
@Test
public void testDfsClientFailover() throws IOException, URISyntaxException {
final String logicalNameNodeId = "ha-nn-uri";
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
String nameServiceId1 = DFSUtil.getNameServiceIdFromAddress(conf, nnAddr1,
@ -69,9 +70,6 @@ public void testDfsClientFailover() throws IOException, URISyntaxException {
String nameServiceId2 = DFSUtil.getNameServiceIdFromAddress(conf, nnAddr2,
DFS_NAMENODE_RPC_ADDRESS_KEY);
String nameNodeId1 = "nn1";
String nameNodeId2 = "nn2";
ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf);
ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf);
@ -85,14 +83,22 @@ public void testDfsClientFailover() throws IOException, URISyntaxException {
out1.close();
out2.close();
String nsId = "nameserviceId1";
final String logicalNameNodeId = "ha-nn-uri";
String nameNodeId1 = "nn1";
String nameNodeId2 = "nn2";
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nameServiceId1, nameNodeId1), address1);
nsId, nameNodeId1), address1);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nameServiceId2, nameNodeId2), address2);
nsId, nameNodeId2), address2);
conf.set(DFS_HA_NAMENODES_KEY, nameNodeId1 + "," + nameNodeId2);
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId),
nameNodeId1 + "," + nameNodeId2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalNameNodeId,
ConfiguredFailoverProxyProvider.class.getName());

View File

@ -279,7 +279,8 @@ private static void validateCluster(DFSClient client, int numDNs)
* @throws IOException */
private void startCluster(int numNameNodes, int numDatanodes,
Configuration conf) throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numNameNodes(numNameNodes)
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
.numDataNodes(numDatanodes).build();
cluster.waitActive();
for (int i = 0; i < numNameNodes; i++) {
@ -507,7 +508,8 @@ public void testHostsFile(int numNameNodes) throws IOException,
InterruptedException {
conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
int numDatanodes = 1;
cluster = new MiniDFSCluster.Builder(conf).numNameNodes(numNameNodes)
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
.numDataNodes(numDatanodes).setupHostsFile(true).build();
cluster.waitActive();

View File

@ -41,7 +41,8 @@ public class TestMiniDFSCluster {
protected File testDataDir;
@Before
public void setUp() {
testDataPath = System.getProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA);
testDataPath = System.getProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
"build/test/data");
testDataDir = new File(new File(testDataPath).getParentFile(),
"miniclusters");

View File

@ -370,7 +370,7 @@ public void testBlockTokenInLastLocatedBlock() throws IOException,
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(1)
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
cluster.waitActive();

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -254,7 +255,7 @@ private void unevenDistribution(final int nNameNodes,
{
LOG.info("UNEVEN 1");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numNameNodes(nNameNodes)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(nDataNodes)
.racks(racks)
.simulatedCapacities(capacities)
@ -275,7 +276,7 @@ private void unevenDistribution(final int nNameNodes,
{
LOG.info("UNEVEN 10");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numNameNodes(nNameNodes)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.racks(racks)
.simulatedCapacities(capacities)
@ -329,7 +330,7 @@ private void runTest(final int nNameNodes, long[] capacities, String[] racks,
LOG.info("RUN_TEST -1");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numNameNodes(nNameNodes)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.racks(racks)
.simulatedCapacities(capacities)

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -36,7 +37,6 @@
* Tests if DataNode process exits if all Block Pool services exit.
*/
public class TestDataNodeExit {
private static int BASEPORT = 9923;
private static long WAIT_TIME_IN_MILLIS = 10;
Configuration conf;
MiniDFSCluster cluster = null;
@ -46,8 +46,9 @@ public void setUp() throws IOException {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 100);
cluster = new MiniDFSCluster.Builder(conf).numNameNodes(3)
.nameNodePort(BASEPORT).build();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
.build();
for (int i = 0; i < 3; i++) {
cluster.waitActive(i);
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
@ -56,8 +57,9 @@ public void setUp() throws Exception {
*/
@Test
public void test2NNRegistration() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2)
.nameNodePort(9928).build();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build();
try {
cluster.waitActive();
NameNode nn1 = cluster.getNameNode(0);
@ -180,8 +182,9 @@ public void testFedSingleNN() throws IOException {
@Test
public void testClusterIdMismatch() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2).
nameNodePort(9928).build();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build();
try {
cluster.waitActive();
@ -216,25 +219,27 @@ public void testMiniDFSClusterWithMultipleNN() throws IOException {
Configuration conf = new HdfsConfiguration();
// start Federated cluster and add a node.
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2).
nameNodePort(9928).build();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build();
Assert.assertNotNull(cluster);
Assert.assertEquals("(1)Should be 2 namenodes", 2, cluster.getNumNameNodes());
// add a node
cluster.addNameNode(conf, 9929);
cluster.addNameNode(conf, 0);
Assert.assertEquals("(1)Should be 3 namenodes", 3, cluster.getNumNameNodes());
cluster.shutdown();
// 2. start with Federation flag set
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).federation(true).
nameNodePort(9928).build();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1))
.build();
Assert.assertNotNull(cluster);
Assert.assertEquals("(2)Should be 1 namenodes", 1, cluster.getNumNameNodes());
// add a node
cluster.addNameNode(conf, 9929);
cluster.addNameNode(conf, 0);
Assert.assertEquals("(2)Should be 2 namenodes", 2, cluster.getNumNameNodes());
cluster.shutdown();

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.junit.Test;
@ -47,8 +48,9 @@ public void testDeleteBlockPool() throws Exception {
try {
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
"namesServerId1,namesServerId2");
cluster = new MiniDFSCluster.Builder(conf).federation(true).numNameNodes(
2).numDataNodes(2).build();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(2).build();
cluster.waitActive();
@ -155,8 +157,9 @@ public void testDfsAdminDeleteBlockPool() throws Exception {
try {
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
"namesServerId1,namesServerId2");
cluster = new MiniDFSCluster.Builder(conf).federation(true).numNameNodes(
2).numDataNodes(1).build();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(1).build();
cluster.waitActive();

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.junit.Test;
@ -41,12 +42,13 @@ public class TestMulitipleNNDataBlockScanner {
String bpids[] = new String[3];
FileSystem fs[] = new FileSystem[3];
public void setUp(int port) throws IOException {
public void setUp() throws IOException {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 100);
cluster = new MiniDFSCluster.Builder(conf).numNameNodes(3).nameNodePort(
port).build();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
.build();
for (int i = 0; i < 3; i++) {
cluster.waitActive(i);
}
@ -65,7 +67,7 @@ public void setUp(int port) throws IOException {
@Test
public void testDataBlockScanner() throws IOException, InterruptedException {
setUp(9923);
setUp();
try {
DataNode dn = cluster.getDataNodes().get(0);
for (int i = 0; i < 3; i++) {
@ -89,9 +91,10 @@ public void testDataBlockScanner() throws IOException, InterruptedException {
@Test
public void testBlockScannerAfterRefresh() throws IOException,
InterruptedException {
setUp(9933);
setUp();
try {
Configuration conf = new HdfsConfiguration(cluster.getConfiguration(0));
Configuration dnConf = cluster.getDataNodes().get(0).getConf();
Configuration conf = new HdfsConfiguration(dnConf);
StringBuilder namenodesBuilder = new StringBuilder();
String bpidToShutdown = cluster.getNamesystem(2).getBlockPoolId();
@ -140,7 +143,7 @@ public void testBlockScannerAfterRefresh() throws IOException,
@Test
public void testBlockScannerAfterRestart() throws IOException,
InterruptedException {
setUp(9943);
setUp();
try {
cluster.restartDataNode(0);
cluster.waitActive();

View File

@ -26,6 +26,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
import org.junit.Test;
/**
@ -43,9 +46,13 @@ public void testRefreshNamenodes() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, "namesServerId1");
cluster = new MiniDFSCluster.Builder(conf).federation(true).
numNameNodes(1).nameNodePort(nnPort1).build();
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new NSConf("ns1").addNN(
new NNConf(null).setIpcPort(nnPort1)))
.setFederation(true);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.build();
DataNode dn = cluster.getDataNodes().get(0);
assertEquals(1, dn.getAllBpOs().length);

View File

@ -97,4 +97,11 @@ public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
ns.readUnlock();
}
}
/**
* Return the FSNamesystem stats
*/
public static long[] getStats(final FSNamesystem fsn) {
return fsn.getStats();
}
}

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -1072,8 +1073,9 @@ public void testMultipleSecondaryNamenodes() throws IOException {
String nameserviceId2 = "ns2";
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceId1
+ "," + nameserviceId2);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2)
.nameNodePort(9928).build();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build();
Configuration snConf1 = new HdfsConfiguration(cluster.getConfiguration(0));
Configuration snConf2 = new HdfsConfiguration(cluster.getConfiguration(1));
InetSocketAddress nn1RpcAddress =

View File

@ -514,11 +514,10 @@ public void testNNRestart() throws IOException, InterruptedException {
InetAddress inetAddress = InetAddress.getByAddress(b);
list.add(inetAddress.getHostName());
writeConfigFile(localFileSys, hostsFile, list);
int numNameNodes = 1;
int numDatanodes = 1;
try {
cluster = new MiniDFSCluster.Builder(conf).numNameNodes(numNameNodes)
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).setupHostsFile(true).build();
cluster.waitActive();