HDFS-3934. duplicative dfs_hosts entries handled wrong. (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489065 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9a3379f6b1
commit
a7bfb25d2b
|
@ -48,7 +48,8 @@ public HostsFileReader(String inFile,
|
||||||
refresh();
|
refresh();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readFileToSet(String filename, Set<String> set) throws IOException {
|
public static void readFileToSet(String type,
|
||||||
|
String filename, Set<String> set) throws IOException {
|
||||||
File file = new File(filename);
|
File file = new File(filename);
|
||||||
FileInputStream fis = new FileInputStream(file);
|
FileInputStream fis = new FileInputStream(file);
|
||||||
BufferedReader reader = null;
|
BufferedReader reader = null;
|
||||||
|
@ -64,8 +65,9 @@ private void readFileToSet(String filename, Set<String> set) throws IOException
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!nodes[i].isEmpty()) {
|
if (!nodes[i].isEmpty()) {
|
||||||
LOG.info("Adding " + nodes[i] + " to the list of hosts from " + filename);
|
LOG.info("Adding " + nodes[i] + " to the list of " + type +
|
||||||
set.add(nodes[i]); // might need to add canonical name
|
" hosts from " + filename);
|
||||||
|
set.add(nodes[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,13 +84,13 @@ public synchronized void refresh() throws IOException {
|
||||||
LOG.info("Refreshing hosts (include/exclude) list");
|
LOG.info("Refreshing hosts (include/exclude) list");
|
||||||
if (!includesFile.isEmpty()) {
|
if (!includesFile.isEmpty()) {
|
||||||
Set<String> newIncludes = new HashSet<String>();
|
Set<String> newIncludes = new HashSet<String>();
|
||||||
readFileToSet(includesFile, newIncludes);
|
readFileToSet("included", includesFile, newIncludes);
|
||||||
// switch the new hosts that are to be included
|
// switch the new hosts that are to be included
|
||||||
includes = newIncludes;
|
includes = newIncludes;
|
||||||
}
|
}
|
||||||
if (!excludesFile.isEmpty()) {
|
if (!excludesFile.isEmpty()) {
|
||||||
Set<String> newExcludes = new HashSet<String>();
|
Set<String> newExcludes = new HashSet<String>();
|
||||||
readFileToSet(excludesFile, newExcludes);
|
readFileToSet("excluded", excludesFile, newExcludes);
|
||||||
// switch the excluded hosts
|
// switch the excluded hosts
|
||||||
excludes = newExcludes;
|
excludes = newExcludes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,6 +238,9 @@ Trunk (Unreleased)
|
||||||
HDFS-4687. TestDelegationTokenForProxyUser#testWebHdfsDoAs is flaky with
|
HDFS-4687. TestDelegationTokenForProxyUser#testWebHdfsDoAs is flaky with
|
||||||
JDK7. (Andrew Wang via atm)
|
JDK7. (Andrew Wang via atm)
|
||||||
|
|
||||||
|
HDFS-3934. duplicative dfs_hosts entries handled wrong. (Colin Patrick
|
||||||
|
McCabe)
|
||||||
|
|
||||||
Release 2.1.0-beta - UNRELEASED
|
Release 2.1.0-beta - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -251,6 +251,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
|
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
|
||||||
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
|
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
|
||||||
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
|
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
|
||||||
|
public static final String DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS =
|
||||||
|
"dfs.namenode.hosts.dns.resolution.interval.seconds";
|
||||||
|
public static final int DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS_DEFAULT = 300;
|
||||||
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
|
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
|
||||||
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
|
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
|
||||||
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
|
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
|
||||||
|
@ -324,7 +327,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_DATANODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTPS_DEFAULT_PORT;
|
public static final String DFS_DATANODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTPS_DEFAULT_PORT;
|
||||||
public static final String DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
|
public static final String DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
|
||||||
public static final int DFS_DATANODE_IPC_DEFAULT_PORT = 50020;
|
public static final int DFS_DATANODE_IPC_DEFAULT_PORT = 50020;
|
||||||
public static final String DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0" + DFS_DATANODE_IPC_DEFAULT_PORT;
|
public static final String DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_IPC_DEFAULT_PORT;
|
||||||
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
|
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
|
||||||
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
|
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
|
||||||
|
|
||||||
|
|
|
@ -26,11 +26,9 @@
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -50,6 +48,10 @@
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||||
|
@ -62,16 +64,17 @@
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||||
import org.apache.hadoop.hdfs.util.CyclicIteration;
|
import org.apache.hadoop.hdfs.util.CyclicIteration;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
|
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.net.NodeBase;
|
import org.apache.hadoop.net.NodeBase;
|
||||||
import org.apache.hadoop.net.ScriptBasedMapping;
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.HostsFileReader;
|
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
@ -120,8 +123,14 @@ public class DatanodeManager {
|
||||||
|
|
||||||
private final DNSToSwitchMapping dnsToSwitchMapping;
|
private final DNSToSwitchMapping dnsToSwitchMapping;
|
||||||
|
|
||||||
|
private final int defaultXferPort;
|
||||||
|
|
||||||
|
private final int defaultInfoPort;
|
||||||
|
|
||||||
|
private final int defaultIpcPort;
|
||||||
|
|
||||||
/** Read include/exclude files*/
|
/** Read include/exclude files*/
|
||||||
private final HostsFileReader hostsReader;
|
private final HostFileManager hostFileManager;
|
||||||
|
|
||||||
/** The period to wait for datanode heartbeat.*/
|
/** The period to wait for datanode heartbeat.*/
|
||||||
private final long heartbeatExpireInterval;
|
private final long heartbeatExpireInterval;
|
||||||
|
@ -162,13 +171,30 @@ public class DatanodeManager {
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
this.blockManager = blockManager;
|
this.blockManager = blockManager;
|
||||||
|
|
||||||
|
int dnsResolutionSeconds = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS_DEFAULT);
|
||||||
|
this.hostFileManager = new HostFileManager(dnsResolutionSeconds);
|
||||||
|
|
||||||
networktopology = NetworkTopology.getInstance(conf);
|
networktopology = NetworkTopology.getInstance(conf);
|
||||||
|
|
||||||
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
|
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
|
||||||
|
|
||||||
this.hostsReader = new HostsFileReader(
|
this.defaultXferPort = NetUtils.createSocketAddr(
|
||||||
conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
|
||||||
|
this.defaultInfoPort = NetUtils.createSocketAddr(
|
||||||
|
conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
|
||||||
|
this.defaultIpcPort = NetUtils.createSocketAddr(
|
||||||
|
conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
|
||||||
|
try {
|
||||||
|
this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
||||||
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("error reading hosts files: ", e);
|
||||||
|
}
|
||||||
|
|
||||||
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
|
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
|
||||||
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||||
|
@ -178,8 +204,14 @@ public class DatanodeManager {
|
||||||
// locations of those hosts in the include list and store the mapping
|
// locations of those hosts in the include list and store the mapping
|
||||||
// in the cache; so future calls to resolve will be fast.
|
// in the cache; so future calls to resolve will be fast.
|
||||||
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
||||||
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
|
final ArrayList<String> locations = new ArrayList<String>();
|
||||||
|
for (Entry entry : hostFileManager.getIncludes()) {
|
||||||
|
if (!entry.getIpAddress().isEmpty()) {
|
||||||
|
locations.add(entry.getIpAddress());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
dnsToSwitchMapping.resolve(locations);
|
||||||
|
};
|
||||||
|
|
||||||
final long heartbeatIntervalSeconds = conf.getLong(
|
final long heartbeatIntervalSeconds = conf.getLong(
|
||||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
|
@ -264,6 +296,7 @@ void activate(final Configuration conf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void close() {
|
void close() {
|
||||||
|
IOUtils.cleanup(LOG, hostFileManager);
|
||||||
if (decommissionthread != null) {
|
if (decommissionthread != null) {
|
||||||
decommissionthread.interrupt();
|
decommissionthread.interrupt();
|
||||||
try {
|
try {
|
||||||
|
@ -533,14 +566,6 @@ private String resolveNetworkLocation (DatanodeID node) {
|
||||||
return networkLocation;
|
return networkLocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean inHostsList(DatanodeID node) {
|
|
||||||
return checkInList(node, hostsReader.getHosts(), false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean inExcludedHostsList(DatanodeID node) {
|
|
||||||
return checkInList(node, hostsReader.getExcludedHosts(), true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove an already decommissioned data node who is neither in include nor
|
* Remove an already decommissioned data node who is neither in include nor
|
||||||
* exclude hosts lists from the the list of live or dead nodes. This is used
|
* exclude hosts lists from the the list of live or dead nodes. This is used
|
||||||
|
@ -570,13 +595,13 @@ private boolean inExcludedHostsList(DatanodeID node) {
|
||||||
private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
|
private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
|
||||||
// If the include list is empty, any nodes are welcomed and it does not
|
// If the include list is empty, any nodes are welcomed and it does not
|
||||||
// make sense to exclude any nodes from the cluster. Therefore, no remove.
|
// make sense to exclude any nodes from the cluster. Therefore, no remove.
|
||||||
if (hostsReader.getHosts().isEmpty()) {
|
if (!hostFileManager.hasIncludes()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
|
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
|
||||||
DatanodeDescriptor node = it.next();
|
DatanodeDescriptor node = it.next();
|
||||||
if ((!inHostsList(node)) && (!inExcludedHostsList(node))
|
if ((!hostFileManager.isIncluded(node)) && (!hostFileManager.isExcluded(node))
|
||||||
&& node.isDecommissioned()) {
|
&& node.isDecommissioned()) {
|
||||||
// Include list is not empty, an existing datanode does not appear
|
// Include list is not empty, an existing datanode does not appear
|
||||||
// in both include or exclude lists and it has been decommissioned.
|
// in both include or exclude lists and it has been decommissioned.
|
||||||
|
@ -586,35 +611,12 @@ private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if the given DatanodeID is in the given (include or exclude) list.
|
|
||||||
*
|
|
||||||
* @param node the DatanodeID to check
|
|
||||||
* @param hostsList the list of hosts in the include/exclude file
|
|
||||||
* @param isExcludeList true if this is the exclude list
|
|
||||||
* @return true if the node is in the list, false otherwise
|
|
||||||
*/
|
|
||||||
private static boolean checkInList(final DatanodeID node,
|
|
||||||
final Set<String> hostsList,
|
|
||||||
final boolean isExcludeList) {
|
|
||||||
// if include list is empty, host is in include list
|
|
||||||
if ( (!isExcludeList) && (hostsList.isEmpty()) ){
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
for (String name : getNodeNamesForHostFiltering(node)) {
|
|
||||||
if (hostsList.contains(name)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decommission the node if it is in exclude list.
|
* Decommission the node if it is in exclude list.
|
||||||
*/
|
*/
|
||||||
private void checkDecommissioning(DatanodeDescriptor nodeReg) {
|
private void checkDecommissioning(DatanodeDescriptor nodeReg) {
|
||||||
// If the registered node is in exclude list, then decommission it
|
// If the registered node is in exclude list, then decommission it
|
||||||
if (inExcludedHostsList(nodeReg)) {
|
if (hostFileManager.isExcluded(nodeReg)) {
|
||||||
startDecommission(nodeReg);
|
startDecommission(nodeReg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -710,7 +712,7 @@ public void registerDatanode(DatanodeRegistration nodeReg)
|
||||||
|
|
||||||
// Checks if the node is not on the hosts list. If it is not, then
|
// Checks if the node is not on the hosts list. If it is not, then
|
||||||
// it will be disallowed from registering.
|
// it will be disallowed from registering.
|
||||||
if (!inHostsList(nodeReg)) {
|
if (!hostFileManager.isIncluded(nodeReg)) {
|
||||||
throw new DisallowedDatanodeException(nodeReg);
|
throw new DisallowedDatanodeException(nodeReg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -844,9 +846,8 @@ private void refreshHostsReader(Configuration conf) throws IOException {
|
||||||
if (conf == null) {
|
if (conf == null) {
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
}
|
}
|
||||||
hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
||||||
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
||||||
hostsReader.refresh();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -858,10 +859,10 @@ private void refreshHostsReader(Configuration conf) throws IOException {
|
||||||
private void refreshDatanodes() {
|
private void refreshDatanodes() {
|
||||||
for(DatanodeDescriptor node : datanodeMap.values()) {
|
for(DatanodeDescriptor node : datanodeMap.values()) {
|
||||||
// Check if not include.
|
// Check if not include.
|
||||||
if (!inHostsList(node)) {
|
if (!hostFileManager.isIncluded(node)) {
|
||||||
node.setDisallowed(true); // case 2.
|
node.setDisallowed(true); // case 2.
|
||||||
} else {
|
} else {
|
||||||
if (inExcludedHostsList(node)) {
|
if (hostFileManager.isExcluded(node)) {
|
||||||
startDecommission(node); // case 3.
|
startDecommission(node); // case 3.
|
||||||
} else {
|
} else {
|
||||||
stopDecommission(node); // case 4.
|
stopDecommission(node); // case 4.
|
||||||
|
@ -1076,25 +1077,10 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
|
||||||
boolean listDeadNodes = type == DatanodeReportType.ALL ||
|
boolean listDeadNodes = type == DatanodeReportType.ALL ||
|
||||||
type == DatanodeReportType.DEAD;
|
type == DatanodeReportType.DEAD;
|
||||||
|
|
||||||
HashMap<String, String> mustList = new HashMap<String, String>();
|
|
||||||
|
|
||||||
if (listDeadNodes) {
|
|
||||||
// Put all nodes referenced in the hosts files in the map
|
|
||||||
Iterator<String> it = hostsReader.getHosts().iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
mustList.put(it.next(), "");
|
|
||||||
}
|
|
||||||
it = hostsReader.getExcludedHosts().iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
mustList.put(it.next(), "");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ArrayList<DatanodeDescriptor> nodes = null;
|
ArrayList<DatanodeDescriptor> nodes = null;
|
||||||
|
final MutableEntrySet foundNodes = new MutableEntrySet();
|
||||||
synchronized(datanodeMap) {
|
synchronized(datanodeMap) {
|
||||||
nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
|
nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size());
|
||||||
mustList.size());
|
|
||||||
Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
|
Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
DatanodeDescriptor dn = it.next();
|
DatanodeDescriptor dn = it.next();
|
||||||
|
@ -1102,47 +1088,45 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
|
||||||
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
|
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
|
||||||
nodes.add(dn);
|
nodes.add(dn);
|
||||||
}
|
}
|
||||||
for (String name : getNodeNamesForHostFiltering(dn)) {
|
foundNodes.add(dn);
|
||||||
mustList.remove(name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (listDeadNodes) {
|
if (listDeadNodes) {
|
||||||
Iterator<String> it = mustList.keySet().iterator();
|
final EntrySet includedNodes = hostFileManager.getIncludes();
|
||||||
while (it.hasNext()) {
|
final EntrySet excludedNodes = hostFileManager.getExcludes();
|
||||||
|
for (Entry entry : includedNodes) {
|
||||||
|
if ((foundNodes.find(entry) == null) &&
|
||||||
|
(excludedNodes.find(entry) == null)) {
|
||||||
// The remaining nodes are ones that are referenced by the hosts
|
// The remaining nodes are ones that are referenced by the hosts
|
||||||
// files but that we do not know about, ie that we have never
|
// files but that we do not know about, ie that we have never
|
||||||
// head from. Eg. a host that is no longer part of the cluster
|
// head from. Eg. an entry that is no longer part of the cluster
|
||||||
// or a bogus entry was given in the hosts files
|
// or a bogus entry was given in the hosts files
|
||||||
DatanodeID dnId = parseDNFromHostsEntry(it.next());
|
//
|
||||||
DatanodeDescriptor dn = new DatanodeDescriptor(dnId);
|
// If the host file entry specified the xferPort, we use that.
|
||||||
|
// Otherwise, we guess that it is the default xfer port.
|
||||||
|
// We can't ask the DataNode what it had configured, because it's
|
||||||
|
// dead.
|
||||||
|
DatanodeDescriptor dn =
|
||||||
|
new DatanodeDescriptor(new DatanodeID(entry.getIpAddress(),
|
||||||
|
entry.getPrefix(), "",
|
||||||
|
entry.getPort() == 0 ? defaultXferPort : entry.getPort(),
|
||||||
|
defaultInfoPort, defaultIpcPort));
|
||||||
dn.setLastUpdate(0); // Consider this node dead for reporting
|
dn.setLastUpdate(0); // Consider this node dead for reporting
|
||||||
nodes.add(dn);
|
nodes.add(dn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("getDatanodeListForReport with " +
|
||||||
|
"includedNodes = " + hostFileManager.getIncludes() +
|
||||||
|
", excludedNodes = " + hostFileManager.getExcludes() +
|
||||||
|
", foundNodes = " + foundNodes +
|
||||||
|
", nodes = " + nodes);
|
||||||
|
}
|
||||||
return nodes;
|
return nodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<String> getNodeNamesForHostFiltering(DatanodeID node) {
|
|
||||||
String ip = node.getIpAddr();
|
|
||||||
String regHostName = node.getHostName();
|
|
||||||
int xferPort = node.getXferPort();
|
|
||||||
|
|
||||||
List<String> names = new ArrayList<String>();
|
|
||||||
names.add(ip);
|
|
||||||
names.add(ip + ":" + xferPort);
|
|
||||||
names.add(regHostName);
|
|
||||||
names.add(regHostName + ":" + xferPort);
|
|
||||||
|
|
||||||
String peerHostName = node.getPeerHostName();
|
|
||||||
if (peerHostName != null) {
|
|
||||||
names.add(peerHostName);
|
|
||||||
names.add(peerHostName + ":" + xferPort);
|
|
||||||
}
|
|
||||||
return names;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if name resolution was successful for the given address. If IP
|
* Checks if name resolution was successful for the given address. If IP
|
||||||
* address and host name are the same, then it means name resolution has
|
* address and host name are the same, then it means name resolution has
|
||||||
|
|
|
@ -936,7 +936,8 @@ private void registerMXBean() {
|
||||||
MBeans.register("DataNode", "DataNodeInfo", this);
|
MBeans.register("DataNode", "DataNodeInfo", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
int getXferPort() {
|
@VisibleForTesting
|
||||||
|
public int getXferPort() {
|
||||||
return streamingAddr.getPort();
|
return streamingAddr.getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -222,6 +222,7 @@ public void testRegistrationWithDifferentSoftwareVersions() throws Exception {
|
||||||
|
|
||||||
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
|
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
|
||||||
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
|
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
|
||||||
|
doReturn(123).when(mockDnReg).getXferPort();
|
||||||
doReturn("fake-storage-id").when(mockDnReg).getStorageID();
|
doReturn("fake-storage-id").when(mockDnReg).getStorageID();
|
||||||
doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
|
doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
|
||||||
|
|
||||||
|
@ -275,6 +276,7 @@ public void testRegistrationWithDifferentSoftwareVersionsDuringUpgrade()
|
||||||
// Should succeed when software versions are the same and CTimes are the
|
// Should succeed when software versions are the same and CTimes are the
|
||||||
// same.
|
// same.
|
||||||
doReturn(VersionInfo.getVersion()).when(mockDnReg).getSoftwareVersion();
|
doReturn(VersionInfo.getVersion()).when(mockDnReg).getSoftwareVersion();
|
||||||
|
doReturn(123).when(mockDnReg).getXferPort();
|
||||||
rpcServer.registerDatanode(mockDnReg);
|
rpcServer.registerDatanode(mockDnReg);
|
||||||
|
|
||||||
// Should succeed when software versions are the same and CTimes are
|
// Should succeed when software versions are the same and CTimes are
|
||||||
|
|
|
@ -24,7 +24,9 @@
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -41,9 +43,11 @@
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -75,6 +79,7 @@ public void setup() throws IOException {
|
||||||
Path dir = new Path(workingDir, System.getProperty("test.build.data", "target/test/data") + "/work-dir/decommission");
|
Path dir = new Path(workingDir, System.getProperty("test.build.data", "target/test/data") + "/work-dir/decommission");
|
||||||
hostsFile = new Path(dir, "hosts");
|
hostsFile = new Path(dir, "hosts");
|
||||||
excludeFile = new Path(dir, "exclude");
|
excludeFile = new Path(dir, "exclude");
|
||||||
|
HostFileManager.dnsResolutionDisabledForTesting = false;
|
||||||
|
|
||||||
// Setup conf
|
// Setup conf
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
|
||||||
|
@ -96,6 +101,7 @@ public void teardown() throws IOException {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
HostFileManager.dnsResolutionDisabledForTesting = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeConfigFile(Path name, ArrayList<String> nodes)
|
private void writeConfigFile(Path name, ArrayList<String> nodes)
|
||||||
|
@ -327,7 +333,7 @@ private void verifyStats(NameNode namenode, FSNamesystem fsn,
|
||||||
/**
|
/**
|
||||||
* Tests decommission for non federated cluster
|
* Tests decommission for non federated cluster
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=360000)
|
||||||
public void testDecommission() throws IOException {
|
public void testDecommission() throws IOException {
|
||||||
testDecommission(1, 6);
|
testDecommission(1, 6);
|
||||||
}
|
}
|
||||||
|
@ -335,7 +341,7 @@ public void testDecommission() throws IOException {
|
||||||
/**
|
/**
|
||||||
* Tests recommission for non federated cluster
|
* Tests recommission for non federated cluster
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=360000)
|
||||||
public void testRecommission() throws IOException {
|
public void testRecommission() throws IOException {
|
||||||
testRecommission(1, 6);
|
testRecommission(1, 6);
|
||||||
}
|
}
|
||||||
|
@ -343,7 +349,7 @@ public void testRecommission() throws IOException {
|
||||||
/**
|
/**
|
||||||
* Test decommission for federeated cluster
|
* Test decommission for federeated cluster
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=360000)
|
||||||
public void testDecommissionFederation() throws IOException {
|
public void testDecommissionFederation() throws IOException {
|
||||||
testDecommission(2, 2);
|
testDecommission(2, 2);
|
||||||
}
|
}
|
||||||
|
@ -445,7 +451,7 @@ private void testRecommission(int numNamenodes, int numDatanodes)
|
||||||
* Tests cluster storage statistics during decommissioning for non
|
* Tests cluster storage statistics during decommissioning for non
|
||||||
* federated cluster
|
* federated cluster
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=360000)
|
||||||
public void testClusterStats() throws Exception {
|
public void testClusterStats() throws Exception {
|
||||||
testClusterStats(1);
|
testClusterStats(1);
|
||||||
}
|
}
|
||||||
|
@ -454,7 +460,7 @@ public void testClusterStats() throws Exception {
|
||||||
* Tests cluster storage statistics during decommissioning for
|
* Tests cluster storage statistics during decommissioning for
|
||||||
* federated cluster
|
* federated cluster
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=360000)
|
||||||
public void testClusterStatsFederation() throws Exception {
|
public void testClusterStatsFederation() throws Exception {
|
||||||
testClusterStats(3);
|
testClusterStats(3);
|
||||||
}
|
}
|
||||||
|
@ -491,7 +497,7 @@ public void testClusterStats(int numNameNodes) throws IOException,
|
||||||
* in the include file are allowed to connect to the namenode in a non
|
* in the include file are allowed to connect to the namenode in a non
|
||||||
* federated cluster.
|
* federated cluster.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=360000)
|
||||||
public void testHostsFile() throws IOException, InterruptedException {
|
public void testHostsFile() throws IOException, InterruptedException {
|
||||||
// Test for a single namenode cluster
|
// Test for a single namenode cluster
|
||||||
testHostsFile(1);
|
testHostsFile(1);
|
||||||
|
@ -502,7 +508,7 @@ public void testHostsFile() throws IOException, InterruptedException {
|
||||||
* in the include file are allowed to connect to the namenode in a
|
* in the include file are allowed to connect to the namenode in a
|
||||||
* federated cluster.
|
* federated cluster.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=360000)
|
||||||
public void testHostsFileFederation() throws IOException, InterruptedException {
|
public void testHostsFileFederation() throws IOException, InterruptedException {
|
||||||
// Test for 3 namenode federated cluster
|
// Test for 3 namenode federated cluster
|
||||||
testHostsFile(3);
|
testHostsFile(3);
|
||||||
|
@ -519,8 +525,8 @@ public void testHostsFile(int numNameNodes) throws IOException,
|
||||||
// Now empty hosts file and ensure the datanode is disallowed
|
// Now empty hosts file and ensure the datanode is disallowed
|
||||||
// from talking to namenode, resulting in it's shutdown.
|
// from talking to namenode, resulting in it's shutdown.
|
||||||
ArrayList<String>list = new ArrayList<String>();
|
ArrayList<String>list = new ArrayList<String>();
|
||||||
final String badHostname = "BOGUSHOST";
|
final String bogusIp = "127.0.30.1";
|
||||||
list.add(badHostname);
|
list.add(bogusIp);
|
||||||
writeConfigFile(hostsFile, list);
|
writeConfigFile(hostsFile, list);
|
||||||
|
|
||||||
for (int j = 0; j < numNameNodes; j++) {
|
for (int j = 0; j < numNameNodes; j++) {
|
||||||
|
@ -544,7 +550,199 @@ public void testHostsFile(int numNameNodes) throws IOException,
|
||||||
assertEquals("There should be 2 dead nodes", 2, info.length);
|
assertEquals("There should be 2 dead nodes", 2, info.length);
|
||||||
DatanodeID id = cluster.getDataNodes().get(0).getDatanodeId();
|
DatanodeID id = cluster.getDataNodes().get(0).getDatanodeId();
|
||||||
assertEquals(id.getHostName(), info[0].getHostName());
|
assertEquals(id.getHostName(), info[0].getHostName());
|
||||||
assertEquals(badHostname, info[1].getHostName());
|
assertEquals(bogusIp, info[1].getHostName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=360000)
|
||||||
|
public void testDuplicateHostsEntries() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
Configuration hdfsConf = new Configuration(conf);
|
||||||
|
cluster = new MiniDFSCluster.Builder(hdfsConf)
|
||||||
|
.numDataNodes(1).setupHostsFile(true).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
int dnPort = cluster.getDataNodes().get(0).getXferPort();
|
||||||
|
|
||||||
|
// pick some random ports that don't overlap with our DN's port
|
||||||
|
// or with each other.
|
||||||
|
Random random = new Random(System.currentTimeMillis());
|
||||||
|
int port1 = dnPort;
|
||||||
|
while (port1 == dnPort) {
|
||||||
|
port1 = random.nextInt(6000) + 1000;
|
||||||
|
}
|
||||||
|
int port2 = dnPort;
|
||||||
|
while ((port2 == dnPort) || (port2 == port1)) {
|
||||||
|
port2 = random.nextInt(6000) + 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now empty hosts file and ensure the datanode is disallowed
|
||||||
|
// from talking to namenode, resulting in it's shutdown.
|
||||||
|
ArrayList<String> nodes = new ArrayList<String>();
|
||||||
|
|
||||||
|
// These entries will be de-duped by the NameNode, since they refer
|
||||||
|
// to the same IP address + port combo.
|
||||||
|
nodes.add("127.0.0.1:" + port1);
|
||||||
|
nodes.add("localhost:" + port1);
|
||||||
|
nodes.add("127.0.0.1:" + port1);
|
||||||
|
|
||||||
|
// The following entries should not be de-duped.
|
||||||
|
nodes.add("127.0.0.1:" + port2);
|
||||||
|
nodes.add("127.0.30.1:" + port1);
|
||||||
|
writeConfigFile(hostsFile, nodes);
|
||||||
|
|
||||||
|
refreshNodes(cluster.getNamesystem(0), hdfsConf);
|
||||||
|
|
||||||
|
DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
|
||||||
|
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
|
||||||
|
for (int i = 0 ; i < 5 && info.length != 0; i++) {
|
||||||
|
LOG.info("Waiting for datanode to be marked dead");
|
||||||
|
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
||||||
|
info = client.datanodeReport(DatanodeReportType.LIVE);
|
||||||
|
}
|
||||||
|
assertEquals("Number of live nodes should be 0", 0, info.length);
|
||||||
|
|
||||||
|
// Test that non-live and bogus hostnames are considered "dead".
|
||||||
|
// The dead report should have an entry for (1) the DN that is
|
||||||
|
// now considered dead because it is no longer allowed to connect
|
||||||
|
// and (2) the bogus entries in the hosts file.
|
||||||
|
DatanodeInfo deadDns[] = client.datanodeReport(DatanodeReportType.DEAD);
|
||||||
|
HashMap<String, DatanodeInfo> deadByXferAddr =
|
||||||
|
new HashMap<String, DatanodeInfo>();
|
||||||
|
for (DatanodeInfo dn : deadDns) {
|
||||||
|
LOG.info("DEAD DatanodeInfo: xferAddr = " + dn.getXferAddr() +
|
||||||
|
", ipAddr = " + dn.getIpAddr() +
|
||||||
|
", hostname = " + dn.getHostName());
|
||||||
|
deadByXferAddr.put(dn.getXferAddr(), dn);
|
||||||
|
}
|
||||||
|
// The real DataNode should be included in the list.
|
||||||
|
String realDnIpPort = cluster.getDataNodes().get(0).
|
||||||
|
getXferAddress().getAddress().getHostAddress() + ":" +
|
||||||
|
cluster.getDataNodes().get(0).getXferPort();
|
||||||
|
Assert.assertNotNull("failed to find real datanode IP " + realDnIpPort,
|
||||||
|
deadByXferAddr.remove(realDnIpPort));
|
||||||
|
// The fake datanode with address 127.0.30.1 should be included in this list.
|
||||||
|
Assert.assertNotNull(deadByXferAddr.remove(
|
||||||
|
"127.0.30.1:" + port1));
|
||||||
|
// Now look for the two copies of 127.0.0.1 with port1 and port2.
|
||||||
|
Iterator<Map.Entry<String, DatanodeInfo>> iter =
|
||||||
|
deadByXferAddr.entrySet().iterator();
|
||||||
|
boolean foundPort1 = false, foundPort2 = false;
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Map.Entry<String, DatanodeInfo> entry = iter.next();
|
||||||
|
DatanodeInfo dn = entry.getValue();
|
||||||
|
if (dn.getXferPort() == port1) {
|
||||||
|
foundPort1 = true;
|
||||||
|
iter.remove();
|
||||||
|
} else if (dn.getXferPort() == port2) {
|
||||||
|
foundPort2 = true;
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertTrue("did not find a dead entry with port " + port1,
|
||||||
|
foundPort1);
|
||||||
|
Assert.assertTrue("did not find a dead entry with port " + port2,
|
||||||
|
foundPort2);
|
||||||
|
Assert.assertTrue(deadByXferAddr.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=360000)
|
||||||
|
public void testIncludeByRegistrationName() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
Configuration hdfsConf = new Configuration(conf);
|
||||||
|
final String registrationName = "--registration-name--";
|
||||||
|
final String nonExistentDn = "127.0.0.40";
|
||||||
|
hdfsConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName);
|
||||||
|
cluster = new MiniDFSCluster.Builder(hdfsConf)
|
||||||
|
.numDataNodes(1).checkDataNodeHostConfig(true)
|
||||||
|
.setupHostsFile(true).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Set up an includes file that doesn't have our datanode.
|
||||||
|
ArrayList<String> nodes = new ArrayList<String>();
|
||||||
|
nodes.add(nonExistentDn);
|
||||||
|
writeConfigFile(hostsFile, nodes);
|
||||||
|
refreshNodes(cluster.getNamesystem(0), hdfsConf);
|
||||||
|
|
||||||
|
// Wait for the DN to be marked dead.
|
||||||
|
DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
|
||||||
|
while (true) {
|
||||||
|
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
|
||||||
|
if (info.length == 1) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("Waiting for datanode to be marked dead");
|
||||||
|
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a non-empty include file with our registration name.
|
||||||
|
// It should work.
|
||||||
|
int dnPort = cluster.getDataNodes().get(0).getXferPort();
|
||||||
|
nodes = new ArrayList<String>();
|
||||||
|
nodes.add(registrationName + ":" + dnPort);
|
||||||
|
writeConfigFile(hostsFile, nodes);
|
||||||
|
refreshNodes(cluster.getNamesystem(0), hdfsConf);
|
||||||
|
cluster.restartDataNode(0);
|
||||||
|
|
||||||
|
// Wait for the DN to come back.
|
||||||
|
while (true) {
|
||||||
|
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
|
||||||
|
if (info.length == 1) {
|
||||||
|
Assert.assertFalse(info[0].isDecommissioned());
|
||||||
|
Assert.assertFalse(info[0].isDecommissionInProgress());
|
||||||
|
assertEquals(registrationName, info[0].getHostName());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("Waiting for datanode to come back");
|
||||||
|
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=360000)
|
||||||
|
public void testBackgroundDnsResolution() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
// Create cluster
|
||||||
|
Configuration hdfsConf = new Configuration(conf);
|
||||||
|
hdfsConf.setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS, 1);
|
||||||
|
cluster = new MiniDFSCluster.Builder(hdfsConf)
|
||||||
|
.numDataNodes(1).checkDataNodeHostConfig(true)
|
||||||
|
.setupHostsFile(true).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Set up an includes file with just 127.0.0.1
|
||||||
|
ArrayList<String> nodes = new ArrayList<String>();
|
||||||
|
String localhost = java.net.InetAddress.getLocalHost().getHostName();
|
||||||
|
nodes.add(localhost);
|
||||||
|
writeConfigFile(hostsFile, nodes);
|
||||||
|
HostFileManager.dnsResolutionDisabledForTesting = true;
|
||||||
|
refreshNodes(cluster.getNamesystem(0), hdfsConf);
|
||||||
|
|
||||||
|
// The DN will be marked dead because DNS resolution was turned off,
|
||||||
|
// and we are in the include file by hostname only.
|
||||||
|
DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
|
||||||
|
while (true) {
|
||||||
|
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
|
||||||
|
if (info.length == 1) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("Waiting for datanode to be marked dead");
|
||||||
|
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow hostname resolution
|
||||||
|
HostFileManager.dnsResolutionDisabledForTesting = false;
|
||||||
|
cluster.restartDataNode(0);
|
||||||
|
|
||||||
|
// Wait for the DN to come back.
|
||||||
|
while (true) {
|
||||||
|
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
|
||||||
|
if (info.length == 1) {
|
||||||
|
Assert.assertFalse(info[0].isDecommissioned());
|
||||||
|
Assert.assertFalse(info[0].isDecommissionInProgress());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("Waiting for datanode to come back");
|
||||||
|
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue