Add missing files from HDFS-9005. (lei)
This commit is contained in:
parent
493648611d
commit
ae98314980
|
@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* This class represents the primary identifier for a Datanode.
|
||||
* Datanodes are identified by how they can be contacted (hostname
|
||||
|
@ -272,4 +274,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
|||
public int compareTo(DatanodeID that) {
|
||||
return getXferAddr().compareTo(that.getXferAddr());
|
||||
}
|
||||
|
||||
public InetSocketAddress getResolvedAddress() {
|
||||
return new InetSocketAddress(this.getIpAddr(), this.getXferPort());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -396,12 +396,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
|
||||
public static final String DFS_DATANODE_HOST_NAME_KEY =
|
||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
|
||||
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
|
||||
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
|
||||
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY =
|
||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
|
||||
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY =
|
||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY;
|
||||
public static final String DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY =
|
||||
"dfs.namenode.hosts.provider.classname";
|
||||
public static final String DFS_HOSTS = "dfs.hosts";
|
||||
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
|
||||
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
|
||||
|
|
|
@ -108,7 +108,7 @@ public class DatanodeManager {
|
|||
private final int defaultIpcPort;
|
||||
|
||||
/** Read include/exclude files. */
|
||||
private final HostFileManager hostFileManager = new HostFileManager();
|
||||
private HostConfigManager hostConfigManager;
|
||||
|
||||
/** The period to wait for datanode heartbeat.*/
|
||||
private long heartbeatExpireInterval;
|
||||
|
@ -201,9 +201,11 @@ public class DatanodeManager {
|
|||
this.defaultIpcPort = NetUtils.createSocketAddr(
|
||||
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
|
||||
this.hostConfigManager = ReflectionUtils.newInstance(
|
||||
conf.getClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
|
||||
HostFileManager.class, HostConfigManager.class), conf);
|
||||
try {
|
||||
this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
||||
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
||||
this.hostConfigManager.refresh();
|
||||
} catch (IOException e) {
|
||||
LOG.error("error reading hosts files: ", e);
|
||||
}
|
||||
|
@ -221,7 +223,7 @@ public class DatanodeManager {
|
|||
// in the cache; so future calls to resolve will be fast.
|
||||
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
||||
final ArrayList<String> locations = new ArrayList<>();
|
||||
for (InetSocketAddress addr : hostFileManager.getIncludes()) {
|
||||
for (InetSocketAddress addr : hostConfigManager.getIncludes()) {
|
||||
locations.add(addr.getAddress().getHostAddress());
|
||||
}
|
||||
dnsToSwitchMapping.resolve(locations);
|
||||
|
@ -334,8 +336,8 @@ public class DatanodeManager {
|
|||
return decomManager;
|
||||
}
|
||||
|
||||
HostFileManager getHostFileManager() {
|
||||
return hostFileManager;
|
||||
public HostConfigManager getHostConfigManager() {
|
||||
return hostConfigManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -622,6 +624,7 @@ public class DatanodeManager {
|
|||
networktopology.add(node); // may throw InvalidTopologyException
|
||||
host2DatanodeMap.add(node);
|
||||
checkIfClusterIsNowMultiRack(node);
|
||||
resolveUpgradeDomain(node);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
|
||||
|
@ -694,7 +697,14 @@ public class DatanodeManager {
|
|||
return new HashMap<> (this.datanodesSoftwareVersions);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void resolveUpgradeDomain(DatanodeDescriptor node) {
|
||||
String upgradeDomain = hostConfigManager.getUpgradeDomain(node);
|
||||
if (upgradeDomain != null && upgradeDomain.length() > 0) {
|
||||
node.setUpgradeDomain(upgradeDomain);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve a node's network location. If the DNS to switch mapping fails
|
||||
* then this method guarantees default rack location.
|
||||
|
@ -821,7 +831,7 @@ public class DatanodeManager {
|
|||
*/
|
||||
void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
|
||||
// If the registered node is in exclude list, then decommission it
|
||||
if (getHostFileManager().isExcluded(nodeReg)) {
|
||||
if (getHostConfigManager().isExcluded(nodeReg)) {
|
||||
decomManager.startDecommission(nodeReg);
|
||||
}
|
||||
}
|
||||
|
@ -861,7 +871,7 @@ public class DatanodeManager {
|
|||
|
||||
// Checks if the node is not on the hosts list. If it is not, then
|
||||
// it will be disallowed from registering.
|
||||
if (!hostFileManager.isIncluded(nodeReg)) {
|
||||
if (!hostConfigManager.isIncluded(nodeReg)) {
|
||||
throw new DisallowedDatanodeException(nodeReg);
|
||||
}
|
||||
|
||||
|
@ -929,7 +939,8 @@ public class DatanodeManager {
|
|||
getNetworkDependenciesWithDefault(nodeS));
|
||||
}
|
||||
getNetworkTopology().add(nodeS);
|
||||
|
||||
resolveUpgradeDomain(nodeS);
|
||||
|
||||
// also treat the registration message as a heartbeat
|
||||
heartbeatManager.register(nodeS);
|
||||
incrementVersionCount(nodeS.getSoftwareVersion());
|
||||
|
@ -961,7 +972,8 @@ public class DatanodeManager {
|
|||
}
|
||||
networktopology.add(nodeDescr);
|
||||
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
|
||||
|
||||
resolveUpgradeDomain(nodeDescr);
|
||||
|
||||
// register new datanode
|
||||
addDatanode(nodeDescr);
|
||||
blockManager.getBlockReportLeaseManager().register(nodeDescr);
|
||||
|
@ -1016,9 +1028,9 @@ public class DatanodeManager {
|
|||
// Update the file names and refresh internal includes and excludes list.
|
||||
if (conf == null) {
|
||||
conf = new HdfsConfiguration();
|
||||
this.hostConfigManager.setConf(conf);
|
||||
}
|
||||
this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
||||
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
||||
this.hostConfigManager.refresh();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1034,15 +1046,16 @@ public class DatanodeManager {
|
|||
}
|
||||
for (DatanodeDescriptor node : copy.values()) {
|
||||
// Check if not include.
|
||||
if (!hostFileManager.isIncluded(node)) {
|
||||
if (!hostConfigManager.isIncluded(node)) {
|
||||
node.setDisallowed(true); // case 2.
|
||||
} else {
|
||||
if (hostFileManager.isExcluded(node)) {
|
||||
if (hostConfigManager.isExcluded(node)) {
|
||||
decomManager.startDecommission(node); // case 3.
|
||||
} else {
|
||||
decomManager.stopDecommission(node); // case 4.
|
||||
}
|
||||
}
|
||||
node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1258,9 +1271,9 @@ public class DatanodeManager {
|
|||
type == DatanodeReportType.DECOMMISSIONING;
|
||||
|
||||
ArrayList<DatanodeDescriptor> nodes;
|
||||
final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet();
|
||||
final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
|
||||
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
|
||||
final HostSet foundNodes = new HostSet();
|
||||
final Iterable<InetSocketAddress> includedNodes =
|
||||
hostConfigManager.getIncludes();
|
||||
|
||||
synchronized(this) {
|
||||
nodes = new ArrayList<>(datanodeMap.size());
|
||||
|
@ -1271,11 +1284,11 @@ public class DatanodeManager {
|
|||
if (((listLiveNodes && !isDead) ||
|
||||
(listDeadNodes && isDead) ||
|
||||
(listDecommissioningNodes && isDecommissioning)) &&
|
||||
hostFileManager.isIncluded(dn)) {
|
||||
hostConfigManager.isIncluded(dn)) {
|
||||
nodes.add(dn);
|
||||
}
|
||||
|
||||
foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn));
|
||||
foundNodes.add(dn.getResolvedAddress());
|
||||
}
|
||||
}
|
||||
Collections.sort(nodes);
|
||||
|
@ -1299,7 +1312,7 @@ public class DatanodeManager {
|
|||
addr.getPort() == 0 ? defaultXferPort : addr.getPort(),
|
||||
defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
|
||||
setDatanodeDead(dn);
|
||||
if (excludedNodes.match(addr)) {
|
||||
if (hostConfigManager.isExcluded(dn)) {
|
||||
dn.setDecommissioned();
|
||||
}
|
||||
nodes.add(dn);
|
||||
|
@ -1308,8 +1321,8 @@ public class DatanodeManager {
|
|||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("getDatanodeListForReport with " +
|
||||
"includedNodes = " + hostFileManager.getIncludes() +
|
||||
", excludedNodes = " + hostFileManager.getExcludes() +
|
||||
"includedNodes = " + hostConfigManager.getIncludes() +
|
||||
", excludedNodes = " + hostConfigManager.getExcludes() +
|
||||
", foundNodes = " + foundNodes +
|
||||
", nodes = " + nodes);
|
||||
}
|
||||
|
|
|
@ -18,28 +18,18 @@
|
|||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.UnmodifiableIterator;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.util.HostsFileReader;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class manages the include and exclude files for HDFS.
|
||||
|
@ -59,11 +49,27 @@ import java.util.Map;
|
|||
* of DNs when it fails to do a forward and reverse lookup. Note that DNS
|
||||
* resolutions are only done during the loading time to minimize the latency.
|
||||
*/
|
||||
class HostFileManager {
|
||||
public class HostFileManager extends HostConfigManager {
|
||||
private static final Log LOG = LogFactory.getLog(HostFileManager.class);
|
||||
private Configuration conf;
|
||||
private HostSet includes = new HostSet();
|
||||
private HostSet excludes = new HostSet();
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh() throws IOException {
|
||||
refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
||||
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
||||
}
|
||||
private static HostSet readFile(String type, String filename)
|
||||
throws IOException {
|
||||
HostSet res = new HostSet();
|
||||
|
@ -99,31 +105,37 @@ class HostFileManager {
|
|||
return null;
|
||||
}
|
||||
|
||||
static InetSocketAddress resolvedAddressFromDatanodeID(DatanodeID id) {
|
||||
return new InetSocketAddress(id.getIpAddr(), id.getXferPort());
|
||||
}
|
||||
|
||||
synchronized HostSet getIncludes() {
|
||||
@Override
|
||||
public synchronized HostSet getIncludes() {
|
||||
return includes;
|
||||
}
|
||||
|
||||
synchronized HostSet getExcludes() {
|
||||
@Override
|
||||
public synchronized HostSet getExcludes() {
|
||||
return excludes;
|
||||
}
|
||||
|
||||
// If the includes list is empty, act as if everything is in the
|
||||
// includes list.
|
||||
synchronized boolean isIncluded(DatanodeID dn) {
|
||||
return includes.isEmpty() || includes.match
|
||||
(resolvedAddressFromDatanodeID(dn));
|
||||
@Override
|
||||
public synchronized boolean isIncluded(DatanodeID dn) {
|
||||
return includes.isEmpty() || includes.match(dn.getResolvedAddress());
|
||||
}
|
||||
|
||||
synchronized boolean isExcluded(DatanodeID dn) {
|
||||
return excludes.match(resolvedAddressFromDatanodeID(dn));
|
||||
@Override
|
||||
public synchronized boolean isExcluded(DatanodeID dn) {
|
||||
return isExcluded(dn.getResolvedAddress());
|
||||
}
|
||||
|
||||
synchronized boolean hasIncludes() {
|
||||
return !includes.isEmpty();
|
||||
private boolean isExcluded(InetSocketAddress address) {
|
||||
return excludes.match(address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getUpgradeDomain(final DatanodeID dn) {
|
||||
// The include/exclude files based config doesn't support upgrade domain
|
||||
// config.
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,7 +145,8 @@ class HostFileManager {
|
|||
* @param excludeFile the path to the new excludes list
|
||||
* @throws IOException thrown if there is a problem reading one of the files
|
||||
*/
|
||||
void refresh(String includeFile, String excludeFile) throws IOException {
|
||||
private void refresh(String includeFile, String excludeFile)
|
||||
throws IOException {
|
||||
HostSet newIncludes = readFile("included", includeFile);
|
||||
HostSet newExcludes = readFile("excluded", excludeFile);
|
||||
|
||||
|
@ -153,84 +166,4 @@ class HostFileManager {
|
|||
excludes = newExcludes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The HostSet allows efficient queries on matching wildcard addresses.
|
||||
* <p/>
|
||||
* For InetSocketAddress A and B with the same host address,
|
||||
* we define a partial order between A and B, A <= B iff A.getPort() == B
|
||||
* .getPort() || B.getPort() == 0.
|
||||
*/
|
||||
static class HostSet implements Iterable<InetSocketAddress> {
|
||||
// Host -> lists of ports
|
||||
private final Multimap<InetAddress, Integer> addrs = HashMultimap.create();
|
||||
|
||||
/**
|
||||
* The function that checks whether there exists an entry foo in the set
|
||||
* so that foo <= addr.
|
||||
*/
|
||||
boolean matchedBy(InetSocketAddress addr) {
|
||||
Collection<Integer> ports = addrs.get(addr.getAddress());
|
||||
return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
|
||||
.getPort());
|
||||
}
|
||||
|
||||
/**
|
||||
* The function that checks whether there exists an entry foo in the set
|
||||
* so that addr <= foo.
|
||||
*/
|
||||
boolean match(InetSocketAddress addr) {
|
||||
int port = addr.getPort();
|
||||
Collection<Integer> ports = addrs.get(addr.getAddress());
|
||||
boolean exactMatch = ports.contains(port);
|
||||
boolean genericMatch = ports.contains(0);
|
||||
return exactMatch || genericMatch;
|
||||
}
|
||||
|
||||
boolean isEmpty() {
|
||||
return addrs.isEmpty();
|
||||
}
|
||||
|
||||
int size() {
|
||||
return addrs.size();
|
||||
}
|
||||
|
||||
void add(InetSocketAddress addr) {
|
||||
Preconditions.checkArgument(!addr.isUnresolved());
|
||||
addrs.put(addr.getAddress(), addr.getPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<InetSocketAddress> iterator() {
|
||||
return new UnmodifiableIterator<InetSocketAddress>() {
|
||||
private final Iterator<Map.Entry<InetAddress,
|
||||
Integer>> it = addrs.entries().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return it.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress next() {
|
||||
Map.Entry<InetAddress, Integer> e = it.next();
|
||||
return new InetSocketAddress(e.getKey(), e.getValue());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("HostSet(");
|
||||
Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
|
||||
new Function<InetSocketAddress, String>() {
|
||||
@Override
|
||||
public String apply(@Nullable InetSocketAddress addr) {
|
||||
assert addr != null;
|
||||
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
|
||||
}
|
||||
}));
|
||||
return sb.append(")").toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2962,4 +2962,18 @@
|
|||
retries or failovers for WebHDFS client.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.hosts.provider.classname</name>
|
||||
<value>org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager</value>
|
||||
<description>
|
||||
The class that provides access for host files.
|
||||
org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager is used
|
||||
by default which loads files specified by dfs.hosts and dfs.hosts.exclude.
|
||||
If org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager is
|
||||
used, it will load the JSON file defined in dfs.hosts.
|
||||
To change class name, nn restart is required. "dfsadmin -refreshNodes" only
|
||||
refreshes the configuration files used by the class.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -142,12 +142,16 @@ The `bin/hdfs dfsadmin` command supports a few HDFS administration related opera
|
|||
during last upgrade.
|
||||
|
||||
* `-refreshNodes`: Updates the namenode with the set of datanodes
|
||||
allowed to connect to the namenode. Namenodes re-read datanode
|
||||
allowed to connect to the namenode. By default, Namenodes re-read datanode
|
||||
hostnames in the file defined by `dfs.hosts`, `dfs.hosts.exclude`
|
||||
Hosts defined in `dfs.hosts` are the datanodes that are part of the
|
||||
cluster. If there are entries in `dfs.hosts`, only the hosts in it
|
||||
are allowed to register with the namenode. Entries in
|
||||
`dfs.hosts.exclude` are datanodes that need to be decommissioned.
|
||||
Alternatively if `dfs.namenode.hosts.provider.classname` is set to
|
||||
`org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager`,
|
||||
all include and exclude hosts are specified in the JSON file defined by
|
||||
`dfs.hosts`.
|
||||
Datanodes complete decommissioning when all the replicas from them
|
||||
are replicated to other datanodes. Decommissioned nodes are not
|
||||
automatically shutdown and are not chosen for writing for new
|
||||
|
|
|
@ -29,11 +29,16 @@ import java.util.List;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -43,7 +48,57 @@ public class TestDatanodeReport {
|
|||
static final Log LOG = LogFactory.getLog(TestDatanodeReport.class);
|
||||
final static private Configuration conf = new HdfsConfiguration();
|
||||
final static private int NUM_OF_DATANODES = 4;
|
||||
|
||||
|
||||
/**
|
||||
* This test verifies upgrade domain is set according to the JSON host file.
|
||||
*/
|
||||
@Test
|
||||
public void testDatanodeReportWithUpgradeDomain() throws Exception {
|
||||
conf.setInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); // 0.5s
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
|
||||
CombinedHostFileManager.class, HostConfigManager.class);
|
||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||
hostsFileWriter.initialize(conf, "temp/datanodeReport");
|
||||
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
final DFSClient client = cluster.getFileSystem().dfs;
|
||||
final String ud1 = "ud1";
|
||||
final String ud2 = "ud2";
|
||||
|
||||
try {
|
||||
//wait until the cluster is up
|
||||
cluster.waitActive();
|
||||
|
||||
DatanodeAdminProperties datanode = new DatanodeAdminProperties();
|
||||
datanode.setHostName(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
|
||||
datanode.setUpgradeDomain(ud1);
|
||||
hostsFileWriter.initIncludeHosts(
|
||||
new DatanodeAdminProperties[]{datanode});
|
||||
client.refreshNodes();
|
||||
DatanodeInfo[] all = client.datanodeReport(DatanodeReportType.ALL);
|
||||
assertEquals(all[0].getUpgradeDomain(), ud1);
|
||||
|
||||
datanode.setUpgradeDomain(null);
|
||||
hostsFileWriter.initIncludeHosts(
|
||||
new DatanodeAdminProperties[]{datanode});
|
||||
client.refreshNodes();
|
||||
all = client.datanodeReport(DatanodeReportType.ALL);
|
||||
assertEquals(all[0].getUpgradeDomain(), null);
|
||||
|
||||
datanode.setUpgradeDomain(ud2);
|
||||
hostsFileWriter.initIncludeHosts(
|
||||
new DatanodeAdminProperties[]{datanode});
|
||||
client.refreshNodes();
|
||||
all = client.datanodeReport(DatanodeReportType.ALL);
|
||||
assertEquals(all[0].getUpgradeDomain(), ud2);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test attempts to different types of datanode report.
|
||||
*/
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Test;
|
||||
|
@ -384,17 +385,8 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
short REPLICATION_FACTOR = 2;
|
||||
final Path filePath = new Path("/testFile");
|
||||
|
||||
// Configure an excludes file
|
||||
FileSystem localFileSys = FileSystem.getLocal(conf);
|
||||
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
|
||||
Path dir = new Path(workingDir, "temp/decommission");
|
||||
Path excludeFile = new Path(dir, "exclude");
|
||||
Path includeFile = new Path(dir, "include");
|
||||
assertTrue(localFileSys.mkdirs(dir));
|
||||
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
|
||||
DFSTestUtil.writeFile(localFileSys, includeFile, "");
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
|
||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||
hostsFileWriter.initialize(conf, "temp/decommission");
|
||||
|
||||
// Two blocks and four racks
|
||||
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
|
||||
|
@ -415,7 +407,7 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
BlockLocation locs[] = fs.getFileBlockLocations(
|
||||
fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
|
||||
String name = locs[0].getNames()[0];
|
||||
DFSTestUtil.writeFile(localFileSys, excludeFile, name);
|
||||
hostsFileWriter.initExcludeHost(name);
|
||||
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
|
||||
DFSTestUtil.waitForDecommission(fs, name);
|
||||
|
||||
|
@ -423,6 +415,7 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
hostsFileWriter.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -437,17 +430,8 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
short REPLICATION_FACTOR = 5;
|
||||
final Path filePath = new Path("/testFile");
|
||||
|
||||
// Configure an excludes file
|
||||
FileSystem localFileSys = FileSystem.getLocal(conf);
|
||||
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
|
||||
Path dir = new Path(workingDir, "temp/decommission");
|
||||
Path excludeFile = new Path(dir, "exclude");
|
||||
Path includeFile = new Path(dir, "include");
|
||||
assertTrue(localFileSys.mkdirs(dir));
|
||||
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
|
||||
DFSTestUtil.writeFile(localFileSys, includeFile, "");
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||
hostsFileWriter.initialize(conf, "temp/decommission");
|
||||
|
||||
// All hosts are on two racks, only one host on /rack2
|
||||
String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
|
||||
|
@ -473,7 +457,7 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
for (String top : locs[0].getTopologyPaths()) {
|
||||
if (!top.startsWith("/rack2")) {
|
||||
String name = top.substring("/rack1".length()+1);
|
||||
DFSTestUtil.writeFile(localFileSys, excludeFile, name);
|
||||
hostsFileWriter.initExcludeHost(name);
|
||||
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
|
||||
DFSTestUtil.waitForDecommission(fs, name);
|
||||
break;
|
||||
|
@ -485,6 +469,7 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
hostsFileWriter.cleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -383,9 +383,9 @@ public class TestDatanodeManager {
|
|||
|
||||
DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
|
||||
HostFileManager hm = new HostFileManager();
|
||||
HostFileManager.HostSet noNodes = new HostFileManager.HostSet();
|
||||
HostFileManager.HostSet oneNode = new HostFileManager.HostSet();
|
||||
HostFileManager.HostSet twoNodes = new HostFileManager.HostSet();
|
||||
HostSet noNodes = new HostSet();
|
||||
HostSet oneNode = new HostSet();
|
||||
HostSet twoNodes = new HostSet();
|
||||
DatanodeRegistration dr1 = new DatanodeRegistration(
|
||||
new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123",
|
||||
12345, 12345, 12345, 12345),
|
||||
|
@ -402,7 +402,7 @@ public class TestDatanodeManager {
|
|||
oneNode.add(entry("127.0.0.1:23456"));
|
||||
|
||||
hm.refresh(twoNodes, noNodes);
|
||||
Whitebox.setInternalState(dm, "hostFileManager", hm);
|
||||
Whitebox.setInternalState(dm, "hostConfigManager", hm);
|
||||
|
||||
// Register two data nodes to simulate them coming up.
|
||||
// We need to add two nodes, because if we have only one node, removing it
|
||||
|
|
|
@ -40,7 +40,7 @@ public class TestHostFileManager {
|
|||
|
||||
@Test
|
||||
public void testDeduplication() {
|
||||
HostFileManager.HostSet s = new HostFileManager.HostSet();
|
||||
HostSet s = new HostSet();
|
||||
// These entries will be de-duped, since they refer to the same IP
|
||||
// address + port combo.
|
||||
s.add(entry("127.0.0.1:12345"));
|
||||
|
@ -60,7 +60,7 @@ public class TestHostFileManager {
|
|||
|
||||
@Test
|
||||
public void testRelation() {
|
||||
HostFileManager.HostSet s = new HostFileManager.HostSet();
|
||||
HostSet s = new HostSet();
|
||||
s.add(entry("127.0.0.1:123"));
|
||||
Assert.assertTrue(s.match(entry("127.0.0.1:123")));
|
||||
Assert.assertFalse(s.match(entry("127.0.0.1:12")));
|
||||
|
@ -105,8 +105,8 @@ public class TestHostFileManager {
|
|||
FSNamesystem fsn = mock(FSNamesystem.class);
|
||||
Configuration conf = new Configuration();
|
||||
HostFileManager hm = new HostFileManager();
|
||||
HostFileManager.HostSet includedNodes = new HostFileManager.HostSet();
|
||||
HostFileManager.HostSet excludedNodes = new HostFileManager.HostSet();
|
||||
HostSet includedNodes = new HostSet();
|
||||
HostSet excludedNodes = new HostSet();
|
||||
|
||||
includedNodes.add(entry("127.0.0.1:12345"));
|
||||
includedNodes.add(entry("localhost:12345"));
|
||||
|
@ -122,7 +122,7 @@ public class TestHostFileManager {
|
|||
hm.refresh(includedNodes, excludedNodes);
|
||||
|
||||
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
|
||||
Whitebox.setInternalState(dm, "hostFileManager", hm);
|
||||
Whitebox.setInternalState(dm, "hostConfigManager", hm);
|
||||
Map<String, DatanodeDescriptor> dnMap = (Map<String,
|
||||
DatanodeDescriptor>) Whitebox.getInternalState(dm, "datanodeMap");
|
||||
|
||||
|
|
|
@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -37,15 +36,33 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager;
|
||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
/**
|
||||
* DFS_HOSTS and DFS_HOSTS_EXCLUDE tests
|
||||
*
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestHostsFiles {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestHostsFiles.class.getName());
|
||||
private Class hostFileMgrClass;
|
||||
|
||||
public TestHostsFiles(Class hostFileMgrClass) {
|
||||
this.hostFileMgrClass = hostFileMgrClass;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Iterable<Object[]> data() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{HostFileManager.class}, {CombinedHostFileManager.class}});
|
||||
}
|
||||
|
||||
/*
|
||||
* Return a configuration object with low timeouts for testing and
|
||||
|
@ -72,6 +89,10 @@ public class TestHostsFiles {
|
|||
|
||||
// Indicates we have multiple racks
|
||||
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
|
||||
|
||||
// Host file manager
|
||||
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
|
||||
hostFileMgrClass, HostConfigManager.class);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -80,18 +101,8 @@ public class TestHostsFiles {
|
|||
Configuration conf = getConf();
|
||||
short REPLICATION_FACTOR = 2;
|
||||
final Path filePath = new Path("/testFile");
|
||||
|
||||
// Configure an excludes file
|
||||
FileSystem localFileSys = FileSystem.getLocal(conf);
|
||||
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
|
||||
Path dir = new Path(workingDir, "temp/decommission");
|
||||
Path excludeFile = new Path(dir, "exclude");
|
||||
Path includeFile = new Path(dir, "include");
|
||||
assertTrue(localFileSys.mkdirs(dir));
|
||||
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
|
||||
DFSTestUtil.writeFile(localFileSys, includeFile, "");
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
|
||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||
hostsFileWriter.initialize(conf, "temp/decommission");
|
||||
|
||||
// Two blocks and four racks
|
||||
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
|
||||
|
@ -112,9 +123,8 @@ public class TestHostsFiles {
|
|||
BlockLocation locs[] = fs.getFileBlockLocations(
|
||||
fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
|
||||
String name = locs[0].getNames()[0];
|
||||
String names = name + "\n" + "localhost:42\n";
|
||||
LOG.info("adding '" + names + "' to exclude file " + excludeFile.toUri().getPath());
|
||||
DFSTestUtil.writeFile(localFileSys, excludeFile, name);
|
||||
LOG.info("adding '" + name + "' to decommission");
|
||||
hostsFileWriter.initExcludeHost(name);
|
||||
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
|
||||
DFSTestUtil.waitForDecommission(fs, name);
|
||||
|
||||
|
@ -131,9 +141,7 @@ public class TestHostsFiles {
|
|||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
if (localFileSys.exists(dir)) {
|
||||
FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
|
||||
}
|
||||
hostsFileWriter.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,20 +149,10 @@ public class TestHostsFiles {
|
|||
public void testHostsIncludeForDeadCount() throws Exception {
|
||||
Configuration conf = getConf();
|
||||
|
||||
// Configure an excludes file
|
||||
FileSystem localFileSys = FileSystem.getLocal(conf);
|
||||
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
|
||||
Path dir = new Path(workingDir, "temp/decommission");
|
||||
Path excludeFile = new Path(dir, "exclude");
|
||||
Path includeFile = new Path(dir, "include");
|
||||
assertTrue(localFileSys.mkdirs(dir));
|
||||
StringBuilder includeHosts = new StringBuilder();
|
||||
includeHosts.append("localhost:52").append("\n").append("127.0.0.1:7777")
|
||||
.append("\n");
|
||||
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
|
||||
DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
|
||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||
hostsFileWriter.initialize(conf, "temp/decommission");
|
||||
hostsFileWriter.initIncludeHosts(new String[]
|
||||
{"localhost:52","127.0.0.1:7777"});
|
||||
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
|
@ -174,9 +172,7 @@ public class TestHostsFiles {
|
|||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
if (localFileSys.exists(dir)) {
|
||||
FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
|
||||
}
|
||||
hostsFileWriter.cleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
|||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
|
||||
import org.apache.hadoop.net.ServerSocketUtil;
|
||||
|
@ -44,9 +45,9 @@ import org.mortbay.util.ajax.JSON;
|
|||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -236,8 +237,8 @@ public class TestNameNodeMXBean {
|
|||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem localFileSys = null;
|
||||
Path dir = null;
|
||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||
hostsFileWriter.initialize(conf, "temp/TestNameNodeMXBean");
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
|
@ -249,18 +250,12 @@ public class TestNameNodeMXBean {
|
|||
ObjectName mxbeanName = new ObjectName(
|
||||
"Hadoop:service=NameNode,name=NameNodeInfo");
|
||||
|
||||
// Define include file to generate deadNodes metrics
|
||||
localFileSys = FileSystem.getLocal(conf);
|
||||
Path workingDir = localFileSys.getWorkingDirectory();
|
||||
dir = new Path(workingDir,"build/test/data/temp/TestNameNodeMXBean");
|
||||
Path includeFile = new Path(dir, "include");
|
||||
assertTrue(localFileSys.mkdirs(dir));
|
||||
StringBuilder includeHosts = new StringBuilder();
|
||||
List<String> hosts = new ArrayList<>();
|
||||
for(DataNode dn : cluster.getDataNodes()) {
|
||||
includeHosts.append(dn.getDisplayName()).append("\n");
|
||||
hosts.add(dn.getDisplayName());
|
||||
}
|
||||
DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
|
||||
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
|
||||
hostsFileWriter.initIncludeHosts(hosts.toArray(
|
||||
new String[hosts.size()]));
|
||||
fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
|
||||
|
||||
cluster.stopDataNode(0);
|
||||
|
@ -282,12 +277,10 @@ public class TestNameNodeMXBean {
|
|||
assertTrue(deadNode.containsKey("xferaddr"));
|
||||
}
|
||||
} finally {
|
||||
if ((localFileSys != null) && localFileSys.exists(dir)) {
|
||||
FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
hostsFileWriter.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.io.IOException;
|
|||
import java.lang.management.ManagementFactory;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
@ -58,6 +57,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -583,27 +583,15 @@ public class TestStartup {
|
|||
@Test
|
||||
public void testNNRestart() throws IOException, InterruptedException {
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem localFileSys;
|
||||
Path hostsFile;
|
||||
Path excludeFile;
|
||||
int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
|
||||
// Set up the hosts/exclude files.
|
||||
localFileSys = FileSystem.getLocal(config);
|
||||
Path workingDir = localFileSys.getWorkingDirectory();
|
||||
Path dir = new Path(workingDir, "build/test/data/work-dir/restartnn");
|
||||
hostsFile = new Path(dir, "hosts");
|
||||
excludeFile = new Path(dir, "exclude");
|
||||
|
||||
// Setup conf
|
||||
config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
||||
writeConfigFile(localFileSys, excludeFile, null);
|
||||
config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
|
||||
// write into hosts file
|
||||
ArrayList<String>list = new ArrayList<String>();
|
||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||
hostsFileWriter.initialize(config, "work-dir/restartnn");
|
||||
|
||||
byte b[] = {127, 0, 0, 1};
|
||||
InetAddress inetAddress = InetAddress.getByAddress(b);
|
||||
list.add(inetAddress.getHostName());
|
||||
writeConfigFile(localFileSys, hostsFile, list);
|
||||
hostsFileWriter.initIncludeHosts(new String[] {inetAddress.getHostName()});
|
||||
|
||||
int numDatanodes = 1;
|
||||
|
||||
try {
|
||||
|
@ -628,37 +616,12 @@ public class TestStartup {
|
|||
fail(StringUtils.stringifyException(e));
|
||||
throw e;
|
||||
} finally {
|
||||
cleanupFile(localFileSys, excludeFile.getParent());
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
hostsFileWriter.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
private void writeConfigFile(FileSystem localFileSys, Path name,
|
||||
ArrayList<String> nodes) throws IOException {
|
||||
// delete if it already exists
|
||||
if (localFileSys.exists(name)) {
|
||||
localFileSys.delete(name, true);
|
||||
}
|
||||
|
||||
FSDataOutputStream stm = localFileSys.create(name);
|
||||
if (nodes != null) {
|
||||
for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
|
||||
String node = it.next();
|
||||
stm.writeBytes(node);
|
||||
stm.writeBytes("\n");
|
||||
}
|
||||
}
|
||||
stm.close();
|
||||
}
|
||||
|
||||
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
|
||||
assertTrue(fileSys.exists(name));
|
||||
fileSys.delete(name, true);
|
||||
assertTrue(!fileSys.exists(name));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testXattrConfiguration() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue