HDFS-9005. Provide configuration support for upgrade domain.

This commit is contained in:
Ming Ma 2017-05-02 06:53:32 -07:00
parent c9bf21b0f3
commit c4c5533216
23 changed files with 1290 additions and 264 deletions

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
/**
* The class describes the configured admin properties for a datanode.
*
* It is the static configuration specified by administrators via dfsadmin
* command; different from the runtime state. CombinedHostFileManager uses
* the class to deserialize the configurations from json-based file format.
*
* To decommission a node, use AdminStates.DECOMMISSIONED.
*/
public class DatanodeAdminProperties {
private String hostName;
private int port;
private String upgradeDomain;
private AdminStates adminState = AdminStates.NORMAL;
/**
* Return the host name of the datanode.
* @return the host name of the datanode.
*/
public String getHostName() {
return hostName;
}
/**
* Set the host name of the datanode.
* @param hostName the host name of the datanode.
*/
public void setHostName(final String hostName) {
this.hostName = hostName;
}
/**
* Get the port number of the datanode.
* @return the port number of the datanode.
*/
public int getPort() {
return port;
}
/**
* Set the port number of the datanode.
* @param port the port number of the datanode.
*/
public void setPort(final int port) {
this.port = port;
}
/**
* Get the upgrade domain of the datanode.
* @return the upgrade domain of the datanode.
*/
public String getUpgradeDomain() {
return upgradeDomain;
}
/**
* Set the upgrade domain of the datanode.
* @param upgradeDomain the upgrade domain of the datanode.
*/
public void setUpgradeDomain(final String upgradeDomain) {
this.upgradeDomain = upgradeDomain;
}
/**
* Get the admin state of the datanode.
* @return the admin state of the datanode.
*/
public AdminStates getAdminState() {
return adminState;
}
/**
* Set the admin state of the datanode.
* @param adminState the admin state of the datanode.
*/
public void setAdminState(final AdminStates adminState) {
this.adminState = adminState;
}
}

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
/** /**
* This class represents the primary identifier for a Datanode. * This class represents the primary identifier for a Datanode.
* Datanodes are identified by how they can be contacted (hostname * Datanodes are identified by how they can be contacted (hostname
@ -272,4 +274,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
public int compareTo(DatanodeID that) { public int compareTo(DatanodeID that) {
return getXferAddr().compareTo(that.getXferAddr()); return getXferAddr().compareTo(that.getXferAddr());
} }
public InetSocketAddress getResolvedAddress() {
return new InetSocketAddress(this.getIpAddr(), this.getXferPort());
}
} }

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Iterator;
import java.util.Set;
import java.util.HashSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
/**
* Reader support for JSON based datanode configuration, an alternative
* to the exclude/include files configuration.
* The JSON file format is the array of elements where each element
* in the array describes the properties of a datanode. The properties of
* a datanode is defined in {@link DatanodeAdminProperties}. For example,
*
* {"hostName": "host1"}
* {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"}
* {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"}
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
public final class CombinedHostsFileReader {
private CombinedHostsFileReader() {
}
/**
* Deserialize a set of DatanodeAdminProperties from a json file.
* @param hostsFile the input json file to read from.
* @return the set of DatanodeAdminProperties
* @throws IOException
*/
public static Set<DatanodeAdminProperties>
readFile(final String hostsFile) throws IOException {
HashSet<DatanodeAdminProperties> allDNs = new HashSet<>();
ObjectMapper mapper = new ObjectMapper();
try (Reader input =
new InputStreamReader(new FileInputStream(hostsFile), "UTF-8")) {
Iterator<DatanodeAdminProperties> iterator =
mapper.readValues(new JsonFactory().createJsonParser(input),
DatanodeAdminProperties.class);
while (iterator.hasNext()) {
DatanodeAdminProperties properties = iterator.next();
allDNs.add(properties);
}
}
return allDNs;
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
/**
* Writer support for JSON based datanode configuration, an alternative
* to the exclude/include files configuration.
* The JSON file format is the array of elements where each element
* in the array describes the properties of a datanode. The properties of
* a datanode is defined in {@link DatanodeAdminProperties}. For example,
*
* {"hostName": "host1"}
* {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"}
* {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"}
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
public final class CombinedHostsFileWriter {
private CombinedHostsFileWriter() {
}
/**
* Serialize a set of DatanodeAdminProperties to a json file.
* @param hostsFile the json file name.
* @param allDNs the set of DatanodeAdminProperties
* @throws IOException
*/
public static void writeFile(final String hostsFile,
final Set<DatanodeAdminProperties> allDNs) throws IOException {
StringBuilder configs = new StringBuilder();
try (Writer output =
new OutputStreamWriter(new FileOutputStream(hostsFile), "UTF-8")) {
for (DatanodeAdminProperties datanodeAdminProperties: allDNs) {
ObjectMapper mapper = new ObjectMapper();
configs.append(mapper.writeValueAsString(datanodeAdminProperties));
}
output.write(configs.toString());
}
}
}

View File

@ -426,12 +426,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals"; public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
public static final String DFS_DATANODE_HOST_NAME_KEY = public static final String DFS_DATANODE_HOST_NAME_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY;
public static final String DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY =
"dfs.namenode.hosts.provider.classname";
public static final String DFS_HOSTS = "dfs.hosts"; public static final String DFS_HOSTS = "dfs.hosts";
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude"; public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers"; public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";

View File

@ -0,0 +1,250 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Collections2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Predicate;
import org.apache.hadoop.hdfs.util.CombinedHostsFileReader;
/**
* This class manages datanode configuration using a json file.
* Please refer to {@link CombinedHostsFileReader} for the json format.
* <p/>
* <p/>
* Entries may or may not specify a port. If they don't, we consider
* them to apply to every DataNode on that host. The code canonicalizes the
* entries into IP addresses.
* <p/>
* <p/>
* The code ignores all entries that the DNS fails to resolve their IP
* addresses. This is okay because by default the NN rejects the registrations
* of DNs when it fails to do a forward and reverse lookup. Note that DNS
* resolutions are only done during the loading time to minimize the latency.
*/
public class CombinedHostFileManager extends HostConfigManager {
private static final Logger LOG = LoggerFactory.getLogger(
CombinedHostFileManager.class);
private Configuration conf;
private HostProperties hostProperties = new HostProperties();
static class HostProperties {
private Multimap<InetAddress, DatanodeAdminProperties> allDNs =
HashMultimap.create();
// optimization. If every node in the file isn't in service, it implies
// any node is allowed to register with nn. This is equivalent to having
// an empty "include" file.
private boolean emptyInServiceNodeLists = true;
synchronized void add(InetAddress addr,
DatanodeAdminProperties properties) {
allDNs.put(addr, properties);
if (properties.getAdminState().equals(
AdminStates.NORMAL)) {
emptyInServiceNodeLists = false;
}
}
// If the includes list is empty, act as if everything is in the
// includes list.
synchronized boolean isIncluded(final InetSocketAddress address) {
return emptyInServiceNodeLists || Iterables.any(
allDNs.get(address.getAddress()),
new Predicate<DatanodeAdminProperties>() {
public boolean apply(DatanodeAdminProperties input) {
return input.getPort() == 0 ||
input.getPort() == address.getPort();
}
});
}
synchronized boolean isExcluded(final InetSocketAddress address) {
return Iterables.any(allDNs.get(address.getAddress()),
new Predicate<DatanodeAdminProperties>() {
public boolean apply(DatanodeAdminProperties input) {
return input.getAdminState().equals(
AdminStates.DECOMMISSIONED) &&
(input.getPort() == 0 ||
input.getPort() == address.getPort());
}
});
}
synchronized String getUpgradeDomain(final InetSocketAddress address) {
Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
allDNs.get(address.getAddress()),
new Predicate<DatanodeAdminProperties>() {
public boolean apply(DatanodeAdminProperties input) {
return (input.getPort() == 0 ||
input.getPort() == address.getPort());
}
});
return datanode.iterator().hasNext() ?
datanode.iterator().next().getUpgradeDomain() : null;
}
Iterable<InetSocketAddress> getIncludes() {
return new Iterable<InetSocketAddress>() {
@Override
public Iterator<InetSocketAddress> iterator() {
return new HostIterator(allDNs.entries());
}
};
}
Iterable<InetSocketAddress> getExcludes() {
return new Iterable<InetSocketAddress>() {
@Override
public Iterator<InetSocketAddress> iterator() {
return new HostIterator(
Collections2.filter(allDNs.entries(),
new Predicate<java.util.Map.Entry<InetAddress,
DatanodeAdminProperties>>() {
public boolean apply(java.util.Map.Entry<InetAddress,
DatanodeAdminProperties> entry) {
return entry.getValue().getAdminState().equals(
AdminStates.DECOMMISSIONED);
}
}
));
}
};
}
static class HostIterator extends UnmodifiableIterator<InetSocketAddress> {
private final Iterator<Map.Entry<InetAddress,
DatanodeAdminProperties>> it;
public HostIterator(Collection<java.util.Map.Entry<InetAddress,
DatanodeAdminProperties>> nodes) {
this.it = nodes.iterator();
}
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public InetSocketAddress next() {
Map.Entry<InetAddress, DatanodeAdminProperties> e = it.next();
return new InetSocketAddress(e.getKey(), e.getValue().getPort());
}
}
}
@Override
public Iterable<InetSocketAddress> getIncludes() {
return hostProperties.getIncludes();
}
@Override
public Iterable<InetSocketAddress> getExcludes() {
return hostProperties.getExcludes();
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void refresh() throws IOException {
refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""));
}
private void refresh(final String hostsFile) throws IOException {
HostProperties hostProps = new HostProperties();
Set<DatanodeAdminProperties> all =
CombinedHostsFileReader.readFile(hostsFile);
for(DatanodeAdminProperties properties : all) {
InetSocketAddress addr = parseEntry(hostsFile,
properties.getHostName(), properties.getPort());
if (addr != null) {
hostProps.add(addr.getAddress(), properties);
}
}
refresh(hostProps);
}
@VisibleForTesting
static InetSocketAddress parseEntry(final String fn, final String hostName,
final int port) {
InetSocketAddress addr = new InetSocketAddress(hostName, port);
if (addr.isUnresolved()) {
LOG.warn("Failed to resolve {} in {}. ", hostName, fn);
return null;
}
return addr;
}
@Override
public synchronized boolean isIncluded(final DatanodeID dn) {
return hostProperties.isIncluded(dn.getResolvedAddress());
}
@Override
public synchronized boolean isExcluded(final DatanodeID dn) {
return isExcluded(dn.getResolvedAddress());
}
private boolean isExcluded(final InetSocketAddress address) {
return hostProperties.isExcluded(address);
}
@Override
public synchronized String getUpgradeDomain(final DatanodeID dn) {
return hostProperties.getUpgradeDomain(dn.getResolvedAddress());
}
/**
* Set the properties lists by the new instances. The
* old instance is discarded.
* @param hostProperties the new properties list
*/
@VisibleForTesting
private void refresh(final HostProperties hostProperties) {
synchronized (this) {
this.hostProperties = hostProperties;
}
}
}

View File

@ -105,7 +105,7 @@ public class DatanodeManager {
private final int defaultIpcPort; private final int defaultIpcPort;
/** Read include/exclude files*/ /** Read include/exclude files*/
private final HostFileManager hostFileManager = new HostFileManager(); private HostConfigManager hostConfigManager;
/** The period to wait for datanode heartbeat.*/ /** The period to wait for datanode heartbeat.*/
private long heartbeatExpireInterval; private long heartbeatExpireInterval;
@ -198,9 +198,11 @@ public class DatanodeManager {
this.defaultIpcPort = NetUtils.createSocketAddr( this.defaultIpcPort = NetUtils.createSocketAddr(
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
this.hostConfigManager = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
HostFileManager.class, HostConfigManager.class), conf);
try { try {
this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""), this.hostConfigManager.refresh();
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
} catch (IOException e) { } catch (IOException e) {
LOG.error("error reading hosts files: ", e); LOG.error("error reading hosts files: ", e);
} }
@ -218,7 +220,7 @@ public class DatanodeManager {
// in the cache; so future calls to resolve will be fast. // in the cache; so future calls to resolve will be fast.
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
final ArrayList<String> locations = new ArrayList<>(); final ArrayList<String> locations = new ArrayList<>();
for (InetSocketAddress addr : hostFileManager.getIncludes()) { for (InetSocketAddress addr : hostConfigManager.getIncludes()) {
locations.add(addr.getAddress().getHostAddress()); locations.add(addr.getAddress().getHostAddress());
} }
dnsToSwitchMapping.resolve(locations); dnsToSwitchMapping.resolve(locations);
@ -331,8 +333,8 @@ public class DatanodeManager {
return decomManager; return decomManager;
} }
HostFileManager getHostFileManager() { public HostConfigManager getHostConfigManager() {
return hostFileManager; return hostConfigManager;
} }
@VisibleForTesting @VisibleForTesting
@ -622,6 +624,7 @@ public class DatanodeManager {
networktopology.add(node); // may throw InvalidTopologyException networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node); host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node); checkIfClusterIsNowMultiRack(node);
resolveUpgradeDomain(node);
blockManager.getBlockReportLeaseManager().register(node); blockManager.getBlockReportLeaseManager().register(node);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -706,7 +709,14 @@ public class DatanodeManager {
return new HashMap<> (this.datanodesSoftwareVersions); return new HashMap<> (this.datanodesSoftwareVersions);
} }
} }
void resolveUpgradeDomain(DatanodeDescriptor node) {
String upgradeDomain = hostConfigManager.getUpgradeDomain(node);
if (upgradeDomain != null && upgradeDomain.length() > 0) {
node.setUpgradeDomain(upgradeDomain);
}
}
/** /**
* Resolve a node's network location. If the DNS to switch mapping fails * Resolve a node's network location. If the DNS to switch mapping fails
* then this method guarantees default rack location. * then this method guarantees default rack location.
@ -836,7 +846,7 @@ public class DatanodeManager {
*/ */
void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) { void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
// If the registered node is in exclude list, then decommission it // If the registered node is in exclude list, then decommission it
if (getHostFileManager().isExcluded(nodeReg)) { if (getHostConfigManager().isExcluded(nodeReg)) {
decomManager.startDecommission(nodeReg); decomManager.startDecommission(nodeReg);
} }
} }
@ -876,7 +886,7 @@ public class DatanodeManager {
// Checks if the node is not on the hosts list. If it is not, then // Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering. // it will be disallowed from registering.
if (!hostFileManager.isIncluded(nodeReg)) { if (!hostConfigManager.isIncluded(nodeReg)) {
throw new DisallowedDatanodeException(nodeReg); throw new DisallowedDatanodeException(nodeReg);
} }
@ -944,7 +954,8 @@ public class DatanodeManager {
getNetworkDependenciesWithDefault(nodeS)); getNetworkDependenciesWithDefault(nodeS));
} }
getNetworkTopology().add(nodeS); getNetworkTopology().add(nodeS);
resolveUpgradeDomain(nodeS);
// also treat the registration message as a heartbeat // also treat the registration message as a heartbeat
heartbeatManager.register(nodeS); heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion()); incrementVersionCount(nodeS.getSoftwareVersion());
@ -976,7 +987,8 @@ public class DatanodeManager {
} }
networktopology.add(nodeDescr); networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
resolveUpgradeDomain(nodeDescr);
// register new datanode // register new datanode
addDatanode(nodeDescr); addDatanode(nodeDescr);
// also treat the registration message as a heartbeat // also treat the registration message as a heartbeat
@ -1030,9 +1042,9 @@ public class DatanodeManager {
// Update the file names and refresh internal includes and excludes list. // Update the file names and refresh internal includes and excludes list.
if (conf == null) { if (conf == null) {
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
this.hostConfigManager.setConf(conf);
} }
this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""), this.hostConfigManager.refresh();
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
} }
/** /**
@ -1044,15 +1056,16 @@ public class DatanodeManager {
private void refreshDatanodes() { private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) { for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include. // Check if not include.
if (!hostFileManager.isIncluded(node)) { if (!hostConfigManager.isIncluded(node)) {
node.setDisallowed(true); // case 2. node.setDisallowed(true); // case 2.
} else { } else {
if (hostFileManager.isExcluded(node)) { if (hostConfigManager.isExcluded(node)) {
decomManager.startDecommission(node); // case 3. decomManager.startDecommission(node); // case 3.
} else { } else {
decomManager.stopDecommission(node); // case 4. decomManager.stopDecommission(node); // case 4.
} }
} }
node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
} }
} }
@ -1260,9 +1273,9 @@ public class DatanodeManager {
type == DatanodeReportType.DECOMMISSIONING; type == DatanodeReportType.DECOMMISSIONING;
ArrayList<DatanodeDescriptor> nodes; ArrayList<DatanodeDescriptor> nodes;
final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet(); final HostSet foundNodes = new HostSet();
final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes(); final Iterable<InetSocketAddress> includedNodes =
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes(); hostConfigManager.getIncludes();
synchronized(datanodeMap) { synchronized(datanodeMap) {
nodes = new ArrayList<>(datanodeMap.size()); nodes = new ArrayList<>(datanodeMap.size());
@ -1273,11 +1286,11 @@ public class DatanodeManager {
if (((listLiveNodes && !isDead) || if (((listLiveNodes && !isDead) ||
(listDeadNodes && isDead) || (listDeadNodes && isDead) ||
(listDecommissioningNodes && isDecommissioning)) && (listDecommissioningNodes && isDecommissioning)) &&
hostFileManager.isIncluded(dn)) { hostConfigManager.isIncluded(dn)) {
nodes.add(dn); nodes.add(dn);
} }
foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn)); foundNodes.add(dn.getResolvedAddress());
} }
} }
Collections.sort(nodes); Collections.sort(nodes);
@ -1301,7 +1314,7 @@ public class DatanodeManager {
addr.getPort() == 0 ? defaultXferPort : addr.getPort(), addr.getPort() == 0 ? defaultXferPort : addr.getPort(),
defaultInfoPort, defaultInfoSecurePort, defaultIpcPort)); defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
setDatanodeDead(dn); setDatanodeDead(dn);
if (excludedNodes.match(addr)) { if (hostConfigManager.isExcluded(dn)) {
dn.setDecommissioned(); dn.setDecommissioned();
} }
nodes.add(dn); nodes.add(dn);
@ -1310,8 +1323,8 @@ public class DatanodeManager {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("getDatanodeListForReport with " + LOG.debug("getDatanodeListForReport with " +
"includedNodes = " + hostFileManager.getIncludes() + "includedNodes = " + hostConfigManager.getIncludes() +
", excludedNodes = " + hostFileManager.getExcludes() + ", excludedNodes = " + hostConfigManager.getExcludes() +
", foundNodes = " + foundNodes + ", foundNodes = " + foundNodes +
", nodes = " + nodes); ", nodes = " + nodes);
} }

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* This interface abstracts how datanode configuration is managed.
*
* Each implementation defines its own way to persist the configuration.
* For example, it can use one JSON file to store the configs for all
* datanodes; or it can use one file to store in-service datanodes and another
* file to store decommission-requested datanodes.
*
* These files control which DataNodes the NameNode expects to see in the
* cluster.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class HostConfigManager implements Configurable {
/**
* Return all the datanodes that are allowed to connect to the namenode.
* @return Iterable of all datanodes
*/
public abstract Iterable<InetSocketAddress> getIncludes();
/**
* Return all datanodes that should be in decommissioned state.
* @return Iterable of those datanodes
*/
public abstract Iterable<InetSocketAddress> getExcludes();
/**
* Check if a datanode is allowed to connect the namenode.
* @param dn the DatanodeID of the datanode
* @return boolean if dn is allowed to connect the namenode.
*/
public abstract boolean isIncluded(DatanodeID dn);
/**
* Check if a datanode needs to be decommissioned.
* @param dn the DatanodeID of the datanode
* @return boolean if dn needs to be decommissioned.
*/
public abstract boolean isExcluded(DatanodeID dn);
/**
* Reload the configuration.
*/
public abstract void refresh() throws IOException;
/**
* Get the upgrade domain of a datanode.
* @param dn the DatanodeID of the datanode
* @return the upgrade domain of dn.
*/
public abstract String getUpgradeDomain(DatanodeID dn);
}

View File

@ -18,28 +18,18 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
/** /**
* This class manages the include and exclude files for HDFS. * This class manages the include and exclude files for HDFS.
@ -59,11 +49,27 @@ import java.util.Map;
* of DNs when it fails to do a forward and reverse lookup. Note that DNS * of DNs when it fails to do a forward and reverse lookup. Note that DNS
* resolutions are only done during the loading time to minimize the latency. * resolutions are only done during the loading time to minimize the latency.
*/ */
class HostFileManager { public class HostFileManager extends HostConfigManager {
private static final Log LOG = LogFactory.getLog(HostFileManager.class); private static final Log LOG = LogFactory.getLog(HostFileManager.class);
private Configuration conf;
private HostSet includes = new HostSet(); private HostSet includes = new HostSet();
private HostSet excludes = new HostSet(); private HostSet excludes = new HostSet();
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void refresh() throws IOException {
refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
}
private static HostSet readFile(String type, String filename) private static HostSet readFile(String type, String filename)
throws IOException { throws IOException {
HostSet res = new HostSet(); HostSet res = new HostSet();
@ -99,31 +105,37 @@ class HostFileManager {
return null; return null;
} }
static InetSocketAddress resolvedAddressFromDatanodeID(DatanodeID id) { @Override
return new InetSocketAddress(id.getIpAddr(), id.getXferPort()); public synchronized HostSet getIncludes() {
}
synchronized HostSet getIncludes() {
return includes; return includes;
} }
synchronized HostSet getExcludes() { @Override
public synchronized HostSet getExcludes() {
return excludes; return excludes;
} }
// If the includes list is empty, act as if everything is in the // If the includes list is empty, act as if everything is in the
// includes list. // includes list.
synchronized boolean isIncluded(DatanodeID dn) { @Override
return includes.isEmpty() || includes.match public synchronized boolean isIncluded(DatanodeID dn) {
(resolvedAddressFromDatanodeID(dn)); return includes.isEmpty() || includes.match(dn.getResolvedAddress());
} }
synchronized boolean isExcluded(DatanodeID dn) { @Override
return excludes.match(resolvedAddressFromDatanodeID(dn)); public synchronized boolean isExcluded(DatanodeID dn) {
return isExcluded(dn.getResolvedAddress());
} }
synchronized boolean hasIncludes() { private boolean isExcluded(InetSocketAddress address) {
return !includes.isEmpty(); return excludes.match(address);
}
@Override
public synchronized String getUpgradeDomain(final DatanodeID dn) {
// The include/exclude files based config doesn't support upgrade domain
// config.
return null;
} }
/** /**
@ -133,7 +145,8 @@ class HostFileManager {
* @param excludeFile the path to the new excludes list * @param excludeFile the path to the new excludes list
* @throws IOException thrown if there is a problem reading one of the files * @throws IOException thrown if there is a problem reading one of the files
*/ */
void refresh(String includeFile, String excludeFile) throws IOException { private void refresh(String includeFile, String excludeFile)
throws IOException {
HostSet newIncludes = readFile("included", includeFile); HostSet newIncludes = readFile("included", includeFile);
HostSet newExcludes = readFile("excluded", excludeFile); HostSet newExcludes = readFile("excluded", excludeFile);
@ -153,84 +166,4 @@ class HostFileManager {
excludes = newExcludes; excludes = newExcludes;
} }
} }
/**
* The HostSet allows efficient queries on matching wildcard addresses.
* <p/>
* For InetSocketAddress A and B with the same host address,
* we define a partial order between A and B, A <= B iff A.getPort() == B
* .getPort() || B.getPort() == 0.
*/
static class HostSet implements Iterable<InetSocketAddress> {
// Host -> lists of ports
private final Multimap<InetAddress, Integer> addrs = HashMultimap.create();
/**
* The function that checks whether there exists an entry foo in the set
* so that foo <= addr.
*/
boolean matchedBy(InetSocketAddress addr) {
Collection<Integer> ports = addrs.get(addr.getAddress());
return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
.getPort());
}
/**
* The function that checks whether there exists an entry foo in the set
* so that addr <= foo.
*/
boolean match(InetSocketAddress addr) {
int port = addr.getPort();
Collection<Integer> ports = addrs.get(addr.getAddress());
boolean exactMatch = ports.contains(port);
boolean genericMatch = ports.contains(0);
return exactMatch || genericMatch;
}
boolean isEmpty() {
return addrs.isEmpty();
}
int size() {
return addrs.size();
}
void add(InetSocketAddress addr) {
Preconditions.checkArgument(!addr.isUnresolved());
addrs.put(addr.getAddress(), addr.getPort());
}
@Override
public Iterator<InetSocketAddress> iterator() {
return new UnmodifiableIterator<InetSocketAddress>() {
private final Iterator<Map.Entry<InetAddress,
Integer>> it = addrs.entries().iterator();
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public InetSocketAddress next() {
Map.Entry<InetAddress, Integer> e = it.next();
return new InetSocketAddress(e.getKey(), e.getValue());
}
};
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("HostSet(");
Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
new Function<InetSocketAddress, String>() {
@Override
public String apply(@Nullable InetSocketAddress addr) {
assert addr != null;
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
}));
return sb.append(")").toString();
}
}
} }

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
import com.google.common.collect.UnmodifiableIterator;
import javax.annotation.Nullable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
/**
* The HostSet allows efficient queries on matching wildcard addresses.
* <p/>
* For InetSocketAddress A and B with the same host address,
* we define a partial order between A and B, A <= B iff A.getPort() == B
* .getPort() || B.getPort() == 0.
*/
public class HostSet implements Iterable<InetSocketAddress> {
// Host -> lists of ports
private final Multimap<InetAddress, Integer> addrs = HashMultimap.create();
/**
* The function that checks whether there exists an entry foo in the set
* so that foo <= addr.
*/
boolean matchedBy(InetSocketAddress addr) {
Collection<Integer> ports = addrs.get(addr.getAddress());
return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
.getPort());
}
/**
* The function that checks whether there exists an entry foo in the set
* so that addr <= foo.
*/
boolean match(InetSocketAddress addr) {
int port = addr.getPort();
Collection<Integer> ports = addrs.get(addr.getAddress());
boolean exactMatch = ports.contains(port);
boolean genericMatch = ports.contains(0);
return exactMatch || genericMatch;
}
boolean isEmpty() {
return addrs.isEmpty();
}
int size() {
return addrs.size();
}
void add(InetSocketAddress addr) {
Preconditions.checkArgument(!addr.isUnresolved());
addrs.put(addr.getAddress(), addr.getPort());
}
@Override
public Iterator<InetSocketAddress> iterator() {
return new UnmodifiableIterator<InetSocketAddress>() {
private final Iterator<Map.Entry<InetAddress,
Integer>> it = addrs.entries().iterator();
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public InetSocketAddress next() {
Map.Entry<InetAddress, Integer> e = it.next();
return new InetSocketAddress(e.getKey(), e.getValue());
}
};
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("HostSet(");
Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
new Function<InetSocketAddress, String>() {
@Override
public String apply(@Nullable InetSocketAddress addr) {
assert addr != null;
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
}));
return sb.append(")").toString();
}
}

View File

@ -3107,4 +3107,19 @@
The size buffer to be used when creating or opening httpfs filesystem IO stream. The size buffer to be used when creating or opening httpfs filesystem IO stream.
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.hosts.provider.classname</name>
<value>org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager</value>
<description>
The class that provides access for host files.
org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager is used
by default which loads files specified by dfs.hosts and dfs.hosts.exclude.
If org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager is
used, it will load the JSON file defined in dfs.hosts.
To change class name, nn restart is required. "dfsadmin -refreshNodes" only
refreshes the configuration files used by the class.
</description>
</property>
</configuration> </configuration>

View File

@ -142,12 +142,16 @@ The `bin/hdfs dfsadmin` command supports a few HDFS administration related opera
during last upgrade. during last upgrade.
* `-refreshNodes`: Updates the namenode with the set of datanodes * `-refreshNodes`: Updates the namenode with the set of datanodes
allowed to connect to the namenode. Namenodes re-read datanode allowed to connect to the namenode. By default, Namenodes re-read datanode
hostnames in the file defined by `dfs.hosts`, `dfs.hosts.exclude` hostnames in the file defined by `dfs.hosts`, `dfs.hosts.exclude`
Hosts defined in `dfs.hosts` are the datanodes that are part of the Hosts defined in `dfs.hosts` are the datanodes that are part of the
cluster. If there are entries in `dfs.hosts`, only the hosts in it cluster. If there are entries in `dfs.hosts`, only the hosts in it
are allowed to register with the namenode. Entries in are allowed to register with the namenode. Entries in
`dfs.hosts.exclude` are datanodes that need to be decommissioned. `dfs.hosts.exclude` are datanodes that need to be decommissioned.
Alternatively if `dfs.namenode.hosts.provider.classname` is set to
`org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager`,
all include and exclude hosts are specified in the JSON file defined by
`dfs.hosts`.
Datanodes complete decommissioning when all the replicas from them Datanodes complete decommissioning when all the replicas from them
are replicated to other datanodes. Decommissioned nodes are not are replicated to other datanodes. Decommissioned nodes are not
automatically shutdown and are not chosen for writing for new automatically shutdown and are not chosen for writing for new

View File

@ -29,15 +29,19 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -48,7 +52,57 @@ public class TestDatanodeReport {
static final Log LOG = LogFactory.getLog(TestDatanodeReport.class); static final Log LOG = LogFactory.getLog(TestDatanodeReport.class);
final static private Configuration conf = new HdfsConfiguration(); final static private Configuration conf = new HdfsConfiguration();
final static private int NUM_OF_DATANODES = 4; final static private int NUM_OF_DATANODES = 4;
/**
* This test verifies upgrade domain is set according to the JSON host file.
*/
@Test
public void testDatanodeReportWithUpgradeDomain() throws Exception {
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); // 0.5s
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class);
HostsFileWriter hostsFileWriter = new HostsFileWriter();
hostsFileWriter.initialize(conf, "temp/datanodeReport");
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
final DFSClient client = cluster.getFileSystem().dfs;
final String ud1 = "ud1";
final String ud2 = "ud2";
try {
//wait until the cluster is up
cluster.waitActive();
DatanodeAdminProperties datanode = new DatanodeAdminProperties();
datanode.setHostName(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
datanode.setUpgradeDomain(ud1);
hostsFileWriter.initIncludeHosts(
new DatanodeAdminProperties[]{datanode});
client.refreshNodes();
DatanodeInfo[] all = client.datanodeReport(DatanodeReportType.ALL);
assertEquals(all[0].getUpgradeDomain(), ud1);
datanode.setUpgradeDomain(null);
hostsFileWriter.initIncludeHosts(
new DatanodeAdminProperties[]{datanode});
client.refreshNodes();
all = client.datanodeReport(DatanodeReportType.ALL);
assertEquals(all[0].getUpgradeDomain(), null);
datanode.setUpgradeDomain(ud2);
hostsFileWriter.initIncludeHosts(
new DatanodeAdminProperties[]{datanode});
client.refreshNodes();
all = client.datanodeReport(DatanodeReportType.ALL);
assertEquals(all[0].getUpgradeDomain(), ud2);
} finally {
cluster.shutdown();
}
}
/** /**
* This test attempts to different types of datanode report. * This test attempts to different types of datanode report.
*/ */

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
@ -384,17 +385,8 @@ public class TestBlocksWithNotEnoughRacks {
short REPLICATION_FACTOR = 2; short REPLICATION_FACTOR = 2;
final Path filePath = new Path("/testFile"); final Path filePath = new Path("/testFile");
// Configure an excludes file HostsFileWriter hostsFileWriter = new HostsFileWriter();
FileSystem localFileSys = FileSystem.getLocal(conf); hostsFileWriter.initialize(conf, "temp/decommission");
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
Path dir = new Path(workingDir, "temp/decommission");
Path excludeFile = new Path(dir, "exclude");
Path includeFile = new Path(dir, "include");
assertTrue(localFileSys.mkdirs(dir));
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
DFSTestUtil.writeFile(localFileSys, includeFile, "");
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
// Two blocks and four racks // Two blocks and four racks
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"}; String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
@ -415,7 +407,7 @@ public class TestBlocksWithNotEnoughRacks {
BlockLocation locs[] = fs.getFileBlockLocations( BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(filePath), 0, Long.MAX_VALUE); fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
String name = locs[0].getNames()[0]; String name = locs[0].getNames()[0];
DFSTestUtil.writeFile(localFileSys, excludeFile, name); hostsFileWriter.initExcludeHost(name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf); ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name); DFSTestUtil.waitForDecommission(fs, name);
@ -423,6 +415,7 @@ public class TestBlocksWithNotEnoughRacks {
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0); DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
hostsFileWriter.cleanup();
} }
} }
@ -437,17 +430,8 @@ public class TestBlocksWithNotEnoughRacks {
short REPLICATION_FACTOR = 5; short REPLICATION_FACTOR = 5;
final Path filePath = new Path("/testFile"); final Path filePath = new Path("/testFile");
// Configure an excludes file HostsFileWriter hostsFileWriter = new HostsFileWriter();
FileSystem localFileSys = FileSystem.getLocal(conf); hostsFileWriter.initialize(conf, "temp/decommission");
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
Path dir = new Path(workingDir, "temp/decommission");
Path excludeFile = new Path(dir, "exclude");
Path includeFile = new Path(dir, "include");
assertTrue(localFileSys.mkdirs(dir));
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
DFSTestUtil.writeFile(localFileSys, includeFile, "");
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
// All hosts are on two racks, only one host on /rack2 // All hosts are on two racks, only one host on /rack2
String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"}; String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
@ -473,7 +457,7 @@ public class TestBlocksWithNotEnoughRacks {
for (String top : locs[0].getTopologyPaths()) { for (String top : locs[0].getTopologyPaths()) {
if (!top.startsWith("/rack2")) { if (!top.startsWith("/rack2")) {
String name = top.substring("/rack1".length()+1); String name = top.substring("/rack1".length()+1);
DFSTestUtil.writeFile(localFileSys, excludeFile, name); hostsFileWriter.initExcludeHost(name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf); ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name); DFSTestUtil.waitForDecommission(fs, name);
break; break;
@ -485,6 +469,7 @@ public class TestBlocksWithNotEnoughRacks {
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0); DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
hostsFileWriter.cleanup();
} }
} }
} }

View File

@ -420,9 +420,9 @@ public class TestDatanodeManager {
DatanodeManager dm = mockDatanodeManager(fsn, new Configuration()); DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
HostFileManager hm = new HostFileManager(); HostFileManager hm = new HostFileManager();
HostFileManager.HostSet noNodes = new HostFileManager.HostSet(); HostSet noNodes = new HostSet();
HostFileManager.HostSet oneNode = new HostFileManager.HostSet(); HostSet oneNode = new HostSet();
HostFileManager.HostSet twoNodes = new HostFileManager.HostSet(); HostSet twoNodes = new HostSet();
DatanodeRegistration dr1 = new DatanodeRegistration( DatanodeRegistration dr1 = new DatanodeRegistration(
new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123", new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123",
12345, 12345, 12345, 12345), 12345, 12345, 12345, 12345),
@ -439,7 +439,7 @@ public class TestDatanodeManager {
oneNode.add(entry("127.0.0.1:23456")); oneNode.add(entry("127.0.0.1:23456"));
hm.refresh(twoNodes, noNodes); hm.refresh(twoNodes, noNodes);
Whitebox.setInternalState(dm, "hostFileManager", hm); Whitebox.setInternalState(dm, "hostConfigManager", hm);
// Register two data nodes to simulate them coming up. // Register two data nodes to simulate them coming up.
// We need to add two nodes, because if we have only one node, removing it // We need to add two nodes, because if we have only one node, removing it

View File

@ -40,7 +40,7 @@ public class TestHostFileManager {
@Test @Test
public void testDeduplication() { public void testDeduplication() {
HostFileManager.HostSet s = new HostFileManager.HostSet(); HostSet s = new HostSet();
// These entries will be de-duped, since they refer to the same IP // These entries will be de-duped, since they refer to the same IP
// address + port combo. // address + port combo.
s.add(entry("127.0.0.1:12345")); s.add(entry("127.0.0.1:12345"));
@ -60,7 +60,7 @@ public class TestHostFileManager {
@Test @Test
public void testRelation() { public void testRelation() {
HostFileManager.HostSet s = new HostFileManager.HostSet(); HostSet s = new HostSet();
s.add(entry("127.0.0.1:123")); s.add(entry("127.0.0.1:123"));
Assert.assertTrue(s.match(entry("127.0.0.1:123"))); Assert.assertTrue(s.match(entry("127.0.0.1:123")));
Assert.assertFalse(s.match(entry("127.0.0.1:12"))); Assert.assertFalse(s.match(entry("127.0.0.1:12")));
@ -105,8 +105,8 @@ public class TestHostFileManager {
FSNamesystem fsn = mock(FSNamesystem.class); FSNamesystem fsn = mock(FSNamesystem.class);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HostFileManager hm = new HostFileManager(); HostFileManager hm = new HostFileManager();
HostFileManager.HostSet includedNodes = new HostFileManager.HostSet(); HostSet includedNodes = new HostSet();
HostFileManager.HostSet excludedNodes = new HostFileManager.HostSet(); HostSet excludedNodes = new HostSet();
includedNodes.add(entry("127.0.0.1:12345")); includedNodes.add(entry("127.0.0.1:12345"));
includedNodes.add(entry("localhost:12345")); includedNodes.add(entry("localhost:12345"));
@ -122,7 +122,7 @@ public class TestHostFileManager {
hm.refresh(includedNodes, excludedNodes); hm.refresh(includedNodes, excludedNodes);
DatanodeManager dm = new DatanodeManager(bm, fsn, conf); DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
Whitebox.setInternalState(dm, "hostFileManager", hm); Whitebox.setInternalState(dm, "hostConfigManager", hm);
Map<String, DatanodeDescriptor> dnMap = (Map<String, Map<String, DatanodeDescriptor> dnMap = (Map<String,
DatanodeDescriptor>) Whitebox.getInternalState(dm, "datanodeMap"); DatanodeDescriptor>) Whitebox.getInternalState(dm, "datanodeMap");

View File

@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.io.File; import java.util.Arrays;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -37,15 +36,33 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** /**
* DFS_HOSTS and DFS_HOSTS_EXCLUDE tests * DFS_HOSTS and DFS_HOSTS_EXCLUDE tests
* *
*/ */
@RunWith(Parameterized.class)
public class TestHostsFiles { public class TestHostsFiles {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestHostsFiles.class.getName()); LogFactory.getLog(TestHostsFiles.class.getName());
private Class hostFileMgrClass;
public TestHostsFiles(Class hostFileMgrClass) {
this.hostFileMgrClass = hostFileMgrClass;
}
@Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][]{
{HostFileManager.class}, {CombinedHostFileManager.class}});
}
/* /*
* Return a configuration object with low timeouts for testing and * Return a configuration object with low timeouts for testing and
@ -72,6 +89,10 @@ public class TestHostsFiles {
// Indicates we have multiple racks // Indicates we have multiple racks
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz"); conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
// Host file manager
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
hostFileMgrClass, HostConfigManager.class);
return conf; return conf;
} }
@ -80,18 +101,8 @@ public class TestHostsFiles {
Configuration conf = getConf(); Configuration conf = getConf();
short REPLICATION_FACTOR = 2; short REPLICATION_FACTOR = 2;
final Path filePath = new Path("/testFile"); final Path filePath = new Path("/testFile");
HostsFileWriter hostsFileWriter = new HostsFileWriter();
// Configure an excludes file hostsFileWriter.initialize(conf, "temp/decommission");
FileSystem localFileSys = FileSystem.getLocal(conf);
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
Path dir = new Path(workingDir, "temp/decommission");
Path excludeFile = new Path(dir, "exclude");
Path includeFile = new Path(dir, "include");
assertTrue(localFileSys.mkdirs(dir));
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
DFSTestUtil.writeFile(localFileSys, includeFile, "");
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
// Two blocks and four racks // Two blocks and four racks
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"}; String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
@ -112,9 +123,8 @@ public class TestHostsFiles {
BlockLocation locs[] = fs.getFileBlockLocations( BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(filePath), 0, Long.MAX_VALUE); fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
String name = locs[0].getNames()[0]; String name = locs[0].getNames()[0];
String names = name + "\n" + "localhost:42\n"; LOG.info("adding '" + name + "' to decommission");
LOG.info("adding '" + names + "' to exclude file " + excludeFile.toUri().getPath()); hostsFileWriter.initExcludeHost(name);
DFSTestUtil.writeFile(localFileSys, excludeFile, name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf); ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name); DFSTestUtil.waitForDecommission(fs, name);
@ -131,9 +141,7 @@ public class TestHostsFiles {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
if (localFileSys.exists(dir)) { hostsFileWriter.cleanup();
FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
}
} }
} }
@ -141,20 +149,10 @@ public class TestHostsFiles {
public void testHostsIncludeForDeadCount() throws Exception { public void testHostsIncludeForDeadCount() throws Exception {
Configuration conf = getConf(); Configuration conf = getConf();
// Configure an excludes file HostsFileWriter hostsFileWriter = new HostsFileWriter();
FileSystem localFileSys = FileSystem.getLocal(conf); hostsFileWriter.initialize(conf, "temp/decommission");
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); hostsFileWriter.initIncludeHosts(new String[]
Path dir = new Path(workingDir, "temp/decommission"); {"localhost:52","127.0.0.1:7777"});
Path excludeFile = new Path(dir, "exclude");
Path includeFile = new Path(dir, "include");
assertTrue(localFileSys.mkdirs(dir));
StringBuilder includeHosts = new StringBuilder();
includeHosts.append("localhost:52").append("\n").append("127.0.0.1:7777")
.append("\n");
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
@ -174,9 +172,7 @@ public class TestHostsFiles {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
if (localFileSys.exists(dir)) { hostsFileWriter.cleanup();
FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
}
} }
} }
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.net.ServerSocketUtil;
@ -44,10 +45,10 @@ import org.mortbay.util.ajax.JSON;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.net.BindException; import java.net.BindException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -238,8 +239,8 @@ public class TestNameNodeMXBean {
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem localFileSys = null; HostsFileWriter hostsFileWriter = new HostsFileWriter();
Path dir = null; hostsFileWriter.initialize(conf, "temp/TestNameNodeMXBean");
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@ -251,18 +252,12 @@ public class TestNameNodeMXBean {
ObjectName mxbeanName = new ObjectName( ObjectName mxbeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo"); "Hadoop:service=NameNode,name=NameNodeInfo");
// Define include file to generate deadNodes metrics List<String> hosts = new ArrayList<>();
localFileSys = FileSystem.getLocal(conf);
Path workingDir = localFileSys.getWorkingDirectory();
dir = new Path(workingDir,"build/test/data/temp/TestNameNodeMXBean");
Path includeFile = new Path(dir, "include");
assertTrue(localFileSys.mkdirs(dir));
StringBuilder includeHosts = new StringBuilder();
for(DataNode dn : cluster.getDataNodes()) { for(DataNode dn : cluster.getDataNodes()) {
includeHosts.append(dn.getDisplayName()).append("\n"); hosts.add(dn.getDisplayName());
} }
DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString()); hostsFileWriter.initIncludeHosts(hosts.toArray(
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); new String[hosts.size()]));
fsn.getBlockManager().getDatanodeManager().refreshNodes(conf); fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
cluster.stopDataNode(0); cluster.stopDataNode(0);
@ -284,12 +279,10 @@ public class TestNameNodeMXBean {
assertTrue(deadNode.containsKey("xferaddr")); assertTrue(deadNode.containsKey("xferaddr"));
} }
} finally { } finally {
if ((localFileSys != null) && localFileSys.exists(dir)) {
FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
}
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
hostsFileWriter.cleanup();
} }
} }

View File

@ -33,7 +33,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -590,27 +590,15 @@ public class TestStartup {
@Test @Test
public void testNNRestart() throws IOException, InterruptedException { public void testNNRestart() throws IOException, InterruptedException {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem localFileSys;
Path hostsFile;
Path excludeFile;
int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
// Set up the hosts/exclude files.
localFileSys = FileSystem.getLocal(config);
Path workingDir = localFileSys.getWorkingDirectory();
Path dir = new Path(workingDir, "build/test/data/work-dir/restartnn");
hostsFile = new Path(dir, "hosts");
excludeFile = new Path(dir, "exclude");
// Setup conf HostsFileWriter hostsFileWriter = new HostsFileWriter();
config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); hostsFileWriter.initialize(config, "work-dir/restartnn");
writeConfigFile(localFileSys, excludeFile, null);
config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
// write into hosts file
ArrayList<String>list = new ArrayList<String>();
byte b[] = {127, 0, 0, 1}; byte b[] = {127, 0, 0, 1};
InetAddress inetAddress = InetAddress.getByAddress(b); InetAddress inetAddress = InetAddress.getByAddress(b);
list.add(inetAddress.getHostName()); hostsFileWriter.initIncludeHosts(new String[] {inetAddress.getHostName()});
writeConfigFile(localFileSys, hostsFile, list);
int numDatanodes = 1; int numDatanodes = 1;
try { try {
@ -635,37 +623,12 @@ public class TestStartup {
fail(StringUtils.stringifyException(e)); fail(StringUtils.stringifyException(e));
throw e; throw e;
} finally { } finally {
cleanupFile(localFileSys, excludeFile.getParent());
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
hostsFileWriter.cleanup();
} }
} }
private void writeConfigFile(FileSystem localFileSys, Path name,
ArrayList<String> nodes) throws IOException {
// delete if it already exists
if (localFileSys.exists(name)) {
localFileSys.delete(name, true);
}
FSDataOutputStream stm = localFileSys.create(name);
if (nodes != null) {
for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
String node = it.next();
stm.writeBytes(node);
stm.writeBytes("\n");
}
}
stm.close();
}
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
fileSys.delete(name, true);
assertTrue(!fileSys.exists(name));
}
@Test(timeout = 120000) @Test(timeout = 120000)
public void testXattrConfiguration() throws Exception { public void testXattrConfiguration() throws Exception {

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.net.StaticMapping;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* End-to-end test case for upgrade domain
* The test configs upgrade domain for nodes via admin json
* config file and put some nodes to decommission state.
* The test then verifies replicas are placed on the nodes that
* satisfy the upgrade domain policy.
*
*/
public class TestUpgradeDomainBlockPlacementPolicy {
private static final short REPLICATION_FACTOR = (short) 3;
private static final int DEFAULT_BLOCK_SIZE = 1024;
static final String[] racks =
{ "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
/**
* Use host names that can be resolved (
* InetSocketAddress#isUnresolved == false). Otherwise,
* CombinedHostFileManager won't allow those hosts.
*/
static final String[] hosts =
{ "127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1",
"127.0.0.1", "127.0.0.1" };
static final String[] upgradeDomains =
{ "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" };
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
private MiniDFSCluster cluster = null;
private NamenodeProtocols nameNodeRpc = null;
private FSNamesystem namesystem = null;
private PermissionStatus perm = null;
@Before
public void setup() throws IOException {
StaticMapping.resetMap();
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithUpgradeDomain.class,
BlockPlacementPolicy.class);
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class);
HostsFileWriter hostsFileWriter = new HostsFileWriter();
hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
.hosts(hosts).build();
cluster.waitActive();
nameNodeRpc = cluster.getNameNodeRpc();
namesystem = cluster.getNamesystem();
perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
FsPermission.getDefault());
refreshDatanodeAdminProperties(hostsFileWriter);
hostsFileWriter.cleanup();
}
@After
public void teardown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/**
* Define admin properties for these datanodes as follows.
* dn0 and dn3 have upgrade domain ud1.
* dn1 and dn4 have upgrade domain ud2.
* dn2 and dn5 have upgrade domain ud3.
* dn0 and dn5 are decommissioned.
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
* rack2. Then any block's replicas should be on either
* {dn1, dn2, d3} or {dn2, dn3, dn4}.
*/
private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter)
throws IOException {
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
hosts.length];
for (int i = 0; i < hosts.length; i++) {
datanodes[i] = new DatanodeAdminProperties();
DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId();
datanodes[i].setHostName(datanodeID.getHostName());
datanodes[i].setPort(datanodeID.getXferPort());
datanodes[i].setUpgradeDomain(upgradeDomains[i]);
}
datanodes[0].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
datanodes[5].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
hostsFileWriter.initIncludeHosts(datanodes);
cluster.getFileSystem().refreshNodes();
expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId());
expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId());
}
@Test
public void testPlacement() throws Exception {
String clientMachine = "127.0.0.1";
for (int i = 0; i < 5; i++) {
String src = "/test-" + i;
// Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null, null);
assertEquals("Block should be allocated sufficient locations",
REPLICATION_FACTOR, locatedBlock.getLocations().length);
Set<DatanodeInfo> locs = new HashSet<>(Arrays.asList(
locatedBlock.getLocations()));
for (DatanodeID datanodeID : expectedDatanodeIDs) {
locs.contains(datanodeID);
}
nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
src, clientMachine);
}
}
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import static org.junit.Assert.assertTrue;
public class HostsFileWriter {
private FileSystem localFileSys;
private Path fullDir;
private Path excludeFile;
private Path includeFile;
private Path combinedFile;
private boolean isLegacyHostsFile = false;
public void initialize(Configuration conf, String dir) throws IOException {
localFileSys = FileSystem.getLocal(conf);
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
this.fullDir = new Path(workingDir, dir);
assertTrue(localFileSys.mkdirs(this.fullDir));
if (conf.getClass(
DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
HostFileManager.class, HostConfigManager.class).equals(
HostFileManager.class)) {
isLegacyHostsFile = true;
}
if (isLegacyHostsFile) {
excludeFile = new Path(fullDir, "exclude");
includeFile = new Path(fullDir, "include");
DFSTestUtil.writeFile(localFileSys, excludeFile, "");
DFSTestUtil.writeFile(localFileSys, includeFile, "");
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
} else {
combinedFile = new Path(fullDir, "all");
conf.set(DFSConfigKeys.DFS_HOSTS, combinedFile.toString());
}
}
public void initExcludeHost(String hostNameAndPort) throws IOException {
if (isLegacyHostsFile) {
DFSTestUtil.writeFile(localFileSys, excludeFile, hostNameAndPort);
} else {
DatanodeAdminProperties dn = new DatanodeAdminProperties();
String [] hostAndPort = hostNameAndPort.split(":");
dn.setHostName(hostAndPort[0]);
dn.setPort(Integer.parseInt(hostAndPort[1]));
dn.setAdminState(AdminStates.DECOMMISSIONED);
HashSet<DatanodeAdminProperties> allDNs = new HashSet<>();
allDNs.add(dn);
CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs);
}
}
public void initIncludeHosts(String[] hostNameAndPorts) throws IOException {
StringBuilder includeHosts = new StringBuilder();
if (isLegacyHostsFile) {
for(String hostNameAndPort : hostNameAndPorts) {
includeHosts.append(hostNameAndPort).append("\n");
}
DFSTestUtil.writeFile(localFileSys, includeFile,
includeHosts.toString());
} else {
HashSet<DatanodeAdminProperties> allDNs = new HashSet<>();
for(String hostNameAndPort : hostNameAndPorts) {
String[] hostAndPort = hostNameAndPort.split(":");
DatanodeAdminProperties dn = new DatanodeAdminProperties();
dn.setHostName(hostAndPort[0]);
dn.setPort(Integer.parseInt(hostAndPort[1]));
allDNs.add(dn);
}
CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs);
}
}
public void initIncludeHosts(DatanodeAdminProperties[] datanodes)
throws IOException {
CombinedHostsFileWriter.writeFile(combinedFile.toString(),
new HashSet<>(Arrays.asList(datanodes)));
}
public void cleanup() throws IOException {
if (localFileSys.exists(fullDir)) {
FileUtils.deleteQuietly(new File(fullDir.toUri().getPath()));
}
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.io.File;
import java.io.FileWriter;
import java.util.Set;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/*
* Test for JSON based HostsFileReader
*/
public class TestCombinedHostsFileReader {
// Using /test/build/data/tmp directory to store temporary files
static final String HOSTS_TEST_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
File NEW_FILE = new File(HOSTS_TEST_DIR, "dfs.hosts.new.json");
static final String TEST_CACHE_DATA_DIR =
System.getProperty("test.cache.data", "build/test/cache");
File EXISTING_FILE = new File(TEST_CACHE_DATA_DIR, "dfs.hosts.json");
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
// Delete test file after running tests
NEW_FILE.delete();
}
/*
* Load the existing test json file
*/
@Test
public void testLoadExistingJsonFile() throws Exception {
Set<DatanodeAdminProperties> all =
CombinedHostsFileReader.readFile(EXISTING_FILE.getAbsolutePath());
assertEquals(5, all.size());
}
/*
* Test empty json config file
*/
@Test
public void testEmptyCombinedHostsFileReader() throws Exception {
FileWriter hosts = new FileWriter(NEW_FILE);
hosts.write("");
hosts.close();
Set<DatanodeAdminProperties> all =
CombinedHostsFileReader.readFile(NEW_FILE.getAbsolutePath());
assertEquals(0, all.size());
}
}

View File

@ -0,0 +1,5 @@
{"hostName": "host1"}
{"hostName": "host2", "upgradeDomain": "ud0"}
{"hostName": "host3", "adminState": "DECOMMISSIONED"}
{"hostName": "host4", "upgradeDomain": "ud2", "adminState": "DECOMMISSIONED"}
{"hostName": "host5", "port": 8090}