From c4c5533216eaaa64731a6f0c1bc9be9b1e91f7d6 Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Tue, 2 May 2017 06:53:32 -0700 Subject: [PATCH] HDFS-9005. Provide configuration support for upgrade domain. --- .../protocol/DatanodeAdminProperties.java | 100 +++++++ .../hadoop/hdfs/protocol/DatanodeID.java | 6 + .../hdfs/util/CombinedHostsFileReader.java | 76 ++++++ .../hdfs/util/CombinedHostsFileWriter.java | 69 +++++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +- .../CombinedHostFileManager.java | 250 ++++++++++++++++++ .../blockmanagement/DatanodeManager.java | 59 +++-- .../blockmanagement/HostConfigManager.java | 80 ++++++ .../blockmanagement/HostFileManager.java | 147 +++------- .../hdfs/server/blockmanagement/HostSet.java | 114 ++++++++ .../src/main/resources/hdfs-default.xml | 15 ++ .../src/site/markdown/HdfsUserGuide.md | 6 +- .../hadoop/hdfs/TestDatanodeReport.java | 56 +++- .../TestBlocksWithNotEnoughRacks.java | 33 +-- .../blockmanagement/TestDatanodeManager.java | 8 +- .../blockmanagement/TestHostFileManager.java | 10 +- .../hdfs/server/namenode/TestHostsFiles.java | 70 +++-- .../server/namenode/TestNameNodeMXBean.java | 25 +- .../hdfs/server/namenode/TestStartup.java | 51 +--- ...TestUpgradeDomainBlockPlacementPolicy.java | 169 ++++++++++++ .../hadoop/hdfs/util/HostsFileWriter.java | 122 +++++++++ .../util/TestCombinedHostsFileReader.java | 79 ++++++ .../src/test/resources/dfs.hosts.json | 5 + 23 files changed, 1290 insertions(+), 264 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java new file mode 100644 index 00000000000..9f7b98309d1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; + +/** + * The class describes the configured admin properties for a datanode. + * + * It is the static configuration specified by administrators via dfsadmin + * command; different from the runtime state. CombinedHostFileManager uses + * the class to deserialize the configurations from json-based file format. + * + * To decommission a node, use AdminStates.DECOMMISSIONED. + */ +public class DatanodeAdminProperties { + private String hostName; + private int port; + private String upgradeDomain; + private AdminStates adminState = AdminStates.NORMAL; + + /** + * Return the host name of the datanode. + * @return the host name of the datanode. + */ + public String getHostName() { + return hostName; + } + + /** + * Set the host name of the datanode. + * @param hostName the host name of the datanode. + */ + public void setHostName(final String hostName) { + this.hostName = hostName; + } + + /** + * Get the port number of the datanode. + * @return the port number of the datanode. + */ + public int getPort() { + return port; + } + + /** + * Set the port number of the datanode. + * @param port the port number of the datanode. + */ + public void setPort(final int port) { + this.port = port; + } + + /** + * Get the upgrade domain of the datanode. + * @return the upgrade domain of the datanode. + */ + public String getUpgradeDomain() { + return upgradeDomain; + } + + /** + * Set the upgrade domain of the datanode. + * @param upgradeDomain the upgrade domain of the datanode. + */ + public void setUpgradeDomain(final String upgradeDomain) { + this.upgradeDomain = upgradeDomain; + } + + /** + * Get the admin state of the datanode. + * @return the admin state of the datanode. + */ + public AdminStates getAdminState() { + return adminState; + } + + /** + * Set the admin state of the datanode. + * @param adminState the admin state of the datanode. + */ + public void setAdminState(final AdminStates adminState) { + this.adminState = adminState; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index 86782f25501..e94c07d1a4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.annotations.VisibleForTesting; +import java.net.InetSocketAddress; + /** * This class represents the primary identifier for a Datanode. * Datanodes are identified by how they can be contacted (hostname @@ -272,4 +274,8 @@ public class DatanodeID implements Comparable { public int compareTo(DatanodeID that) { return getXferAddr().compareTo(that.getXferAddr()); } + + public InetSocketAddress getResolvedAddress() { + return new InetSocketAddress(this.getIpAddr(), this.getXferPort()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java new file mode 100644 index 00000000000..33acb91f837 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.Reader; + +import java.util.Iterator; +import java.util.Set; +import java.util.HashSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; + +/** + * Reader support for JSON based datanode configuration, an alternative + * to the exclude/include files configuration. + * The JSON file format is the array of elements where each element + * in the array describes the properties of a datanode. The properties of + * a datanode is defined in {@link DatanodeAdminProperties}. For example, + * + * {"hostName": "host1"} + * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"} + * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"} + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Unstable +public final class CombinedHostsFileReader { + private CombinedHostsFileReader() { + } + + /** + * Deserialize a set of DatanodeAdminProperties from a json file. + * @param hostsFile the input json file to read from. + * @return the set of DatanodeAdminProperties + * @throws IOException + */ + public static Set + readFile(final String hostsFile) throws IOException { + HashSet allDNs = new HashSet<>(); + ObjectMapper mapper = new ObjectMapper(); + try (Reader input = + new InputStreamReader(new FileInputStream(hostsFile), "UTF-8")) { + Iterator iterator = + mapper.readValues(new JsonFactory().createJsonParser(input), + DatanodeAdminProperties.class); + while (iterator.hasNext()) { + DatanodeAdminProperties properties = iterator.next(); + allDNs.add(properties); + } + } + return allDNs; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java new file mode 100644 index 00000000000..ea70be2eb70 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; + +/** + * Writer support for JSON based datanode configuration, an alternative + * to the exclude/include files configuration. + * The JSON file format is the array of elements where each element + * in the array describes the properties of a datanode. The properties of + * a datanode is defined in {@link DatanodeAdminProperties}. For example, + * + * {"hostName": "host1"} + * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"} + * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"} + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Unstable +public final class CombinedHostsFileWriter { + private CombinedHostsFileWriter() { + } + + /** + * Serialize a set of DatanodeAdminProperties to a json file. + * @param hostsFile the json file name. + * @param allDNs the set of DatanodeAdminProperties + * @throws IOException + */ + public static void writeFile(final String hostsFile, + final Set allDNs) throws IOException { + StringBuilder configs = new StringBuilder(); + try (Writer output = + new OutputStreamWriter(new FileOutputStream(hostsFile), "UTF-8")) { + for (DatanodeAdminProperties datanodeAdminProperties: allDNs) { + ObjectMapper mapper = new ObjectMapper(); + configs.append(mapper.writeValueAsString(datanodeAdminProperties)); + } + output.write(configs.toString()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b62010b7996..51a75cd9440 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -426,12 +426,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals"; public static final String DFS_DATANODE_HOST_NAME_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY; - public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts"; - public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude"; public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY; public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY; + public static final String DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY = + "dfs.namenode.hosts.provider.classname"; public static final String DFS_HOSTS = "dfs.hosts"; public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude"; public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java new file mode 100644 index 00000000000..3e913b93a25 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.UnmodifiableIterator; +import com.google.common.collect.Iterables; +import com.google.common.collect.Collections2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; + +import java.io.IOException; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Predicate; + +import org.apache.hadoop.hdfs.util.CombinedHostsFileReader; + +/** + * This class manages datanode configuration using a json file. + * Please refer to {@link CombinedHostsFileReader} for the json format. + *

+ *

+ * Entries may or may not specify a port. If they don't, we consider + * them to apply to every DataNode on that host. The code canonicalizes the + * entries into IP addresses. + *

+ *

+ * The code ignores all entries that the DNS fails to resolve their IP + * addresses. This is okay because by default the NN rejects the registrations + * of DNs when it fails to do a forward and reverse lookup. Note that DNS + * resolutions are only done during the loading time to minimize the latency. + */ +public class CombinedHostFileManager extends HostConfigManager { + private static final Logger LOG = LoggerFactory.getLogger( + CombinedHostFileManager.class); + private Configuration conf; + private HostProperties hostProperties = new HostProperties(); + + static class HostProperties { + private Multimap allDNs = + HashMultimap.create(); + // optimization. If every node in the file isn't in service, it implies + // any node is allowed to register with nn. This is equivalent to having + // an empty "include" file. + private boolean emptyInServiceNodeLists = true; + synchronized void add(InetAddress addr, + DatanodeAdminProperties properties) { + allDNs.put(addr, properties); + if (properties.getAdminState().equals( + AdminStates.NORMAL)) { + emptyInServiceNodeLists = false; + } + } + + // If the includes list is empty, act as if everything is in the + // includes list. + synchronized boolean isIncluded(final InetSocketAddress address) { + return emptyInServiceNodeLists || Iterables.any( + allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return input.getPort() == 0 || + input.getPort() == address.getPort(); + } + }); + } + + synchronized boolean isExcluded(final InetSocketAddress address) { + return Iterables.any(allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return input.getAdminState().equals( + AdminStates.DECOMMISSIONED) && + (input.getPort() == 0 || + input.getPort() == address.getPort()); + } + }); + } + + synchronized String getUpgradeDomain(final InetSocketAddress address) { + Iterable datanode = Iterables.filter( + allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return (input.getPort() == 0 || + input.getPort() == address.getPort()); + } + }); + return datanode.iterator().hasNext() ? + datanode.iterator().next().getUpgradeDomain() : null; + } + + Iterable getIncludes() { + return new Iterable() { + @Override + public Iterator iterator() { + return new HostIterator(allDNs.entries()); + } + }; + } + + Iterable getExcludes() { + return new Iterable() { + @Override + public Iterator iterator() { + return new HostIterator( + Collections2.filter(allDNs.entries(), + new Predicate>() { + public boolean apply(java.util.Map.Entry entry) { + return entry.getValue().getAdminState().equals( + AdminStates.DECOMMISSIONED); + } + } + )); + } + }; + } + + static class HostIterator extends UnmodifiableIterator { + private final Iterator> it; + public HostIterator(Collection> nodes) { + this.it = nodes.iterator(); + } + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public InetSocketAddress next() { + Map.Entry e = it.next(); + return new InetSocketAddress(e.getKey(), e.getValue().getPort()); + } + } + } + + @Override + public Iterable getIncludes() { + return hostProperties.getIncludes(); + } + + @Override + public Iterable getExcludes() { + return hostProperties.getExcludes(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void refresh() throws IOException { + refresh(conf.get(DFSConfigKeys.DFS_HOSTS, "")); + } + private void refresh(final String hostsFile) throws IOException { + HostProperties hostProps = new HostProperties(); + Set all = + CombinedHostsFileReader.readFile(hostsFile); + for(DatanodeAdminProperties properties : all) { + InetSocketAddress addr = parseEntry(hostsFile, + properties.getHostName(), properties.getPort()); + if (addr != null) { + hostProps.add(addr.getAddress(), properties); + } + } + refresh(hostProps); + } + + @VisibleForTesting + static InetSocketAddress parseEntry(final String fn, final String hostName, + final int port) { + InetSocketAddress addr = new InetSocketAddress(hostName, port); + if (addr.isUnresolved()) { + LOG.warn("Failed to resolve {} in {}. ", hostName, fn); + return null; + } + return addr; + } + + @Override + public synchronized boolean isIncluded(final DatanodeID dn) { + return hostProperties.isIncluded(dn.getResolvedAddress()); + } + + @Override + public synchronized boolean isExcluded(final DatanodeID dn) { + return isExcluded(dn.getResolvedAddress()); + } + + private boolean isExcluded(final InetSocketAddress address) { + return hostProperties.isExcluded(address); + } + + @Override + public synchronized String getUpgradeDomain(final DatanodeID dn) { + return hostProperties.getUpgradeDomain(dn.getResolvedAddress()); + } + + /** + * Set the properties lists by the new instances. The + * old instance is discarded. + * @param hostProperties the new properties list + */ + @VisibleForTesting + private void refresh(final HostProperties hostProperties) { + synchronized (this) { + this.hostProperties = hostProperties; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index ef4a47a0312..eeda5b1beaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -105,7 +105,7 @@ public class DatanodeManager { private final int defaultIpcPort; /** Read include/exclude files*/ - private final HostFileManager hostFileManager = new HostFileManager(); + private HostConfigManager hostConfigManager; /** The period to wait for datanode heartbeat.*/ private long heartbeatExpireInterval; @@ -198,9 +198,11 @@ public class DatanodeManager { this.defaultIpcPort = NetUtils.createSocketAddr( conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); + this.hostConfigManager = ReflectionUtils.newInstance( + conf.getClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + HostFileManager.class, HostConfigManager.class), conf); try { - this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""), - conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, "")); + this.hostConfigManager.refresh(); } catch (IOException e) { LOG.error("error reading hosts files: ", e); } @@ -218,7 +220,7 @@ public class DatanodeManager { // in the cache; so future calls to resolve will be fast. if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { final ArrayList locations = new ArrayList<>(); - for (InetSocketAddress addr : hostFileManager.getIncludes()) { + for (InetSocketAddress addr : hostConfigManager.getIncludes()) { locations.add(addr.getAddress().getHostAddress()); } dnsToSwitchMapping.resolve(locations); @@ -331,8 +333,8 @@ public class DatanodeManager { return decomManager; } - HostFileManager getHostFileManager() { - return hostFileManager; + public HostConfigManager getHostConfigManager() { + return hostConfigManager; } @VisibleForTesting @@ -622,6 +624,7 @@ public class DatanodeManager { networktopology.add(node); // may throw InvalidTopologyException host2DatanodeMap.add(node); checkIfClusterIsNowMultiRack(node); + resolveUpgradeDomain(node); blockManager.getBlockReportLeaseManager().register(node); if (LOG.isDebugEnabled()) { @@ -706,7 +709,14 @@ public class DatanodeManager { return new HashMap<> (this.datanodesSoftwareVersions); } } - + + void resolveUpgradeDomain(DatanodeDescriptor node) { + String upgradeDomain = hostConfigManager.getUpgradeDomain(node); + if (upgradeDomain != null && upgradeDomain.length() > 0) { + node.setUpgradeDomain(upgradeDomain); + } + } + /** * Resolve a node's network location. If the DNS to switch mapping fails * then this method guarantees default rack location. @@ -836,7 +846,7 @@ public class DatanodeManager { */ void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) { // If the registered node is in exclude list, then decommission it - if (getHostFileManager().isExcluded(nodeReg)) { + if (getHostConfigManager().isExcluded(nodeReg)) { decomManager.startDecommission(nodeReg); } } @@ -876,7 +886,7 @@ public class DatanodeManager { // Checks if the node is not on the hosts list. If it is not, then // it will be disallowed from registering. - if (!hostFileManager.isIncluded(nodeReg)) { + if (!hostConfigManager.isIncluded(nodeReg)) { throw new DisallowedDatanodeException(nodeReg); } @@ -944,7 +954,8 @@ public class DatanodeManager { getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); - + resolveUpgradeDomain(nodeS); + // also treat the registration message as a heartbeat heartbeatManager.register(nodeS); incrementVersionCount(nodeS.getSoftwareVersion()); @@ -976,7 +987,8 @@ public class DatanodeManager { } networktopology.add(nodeDescr); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); - + resolveUpgradeDomain(nodeDescr); + // register new datanode addDatanode(nodeDescr); // also treat the registration message as a heartbeat @@ -1030,9 +1042,9 @@ public class DatanodeManager { // Update the file names and refresh internal includes and excludes list. if (conf == null) { conf = new HdfsConfiguration(); + this.hostConfigManager.setConf(conf); } - this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""), - conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, "")); + this.hostConfigManager.refresh(); } /** @@ -1044,15 +1056,16 @@ public class DatanodeManager { private void refreshDatanodes() { for(DatanodeDescriptor node : datanodeMap.values()) { // Check if not include. - if (!hostFileManager.isIncluded(node)) { + if (!hostConfigManager.isIncluded(node)) { node.setDisallowed(true); // case 2. } else { - if (hostFileManager.isExcluded(node)) { + if (hostConfigManager.isExcluded(node)) { decomManager.startDecommission(node); // case 3. } else { decomManager.stopDecommission(node); // case 4. } } + node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node)); } } @@ -1260,9 +1273,9 @@ public class DatanodeManager { type == DatanodeReportType.DECOMMISSIONING; ArrayList nodes; - final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet(); - final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes(); - final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes(); + final HostSet foundNodes = new HostSet(); + final Iterable includedNodes = + hostConfigManager.getIncludes(); synchronized(datanodeMap) { nodes = new ArrayList<>(datanodeMap.size()); @@ -1273,11 +1286,11 @@ public class DatanodeManager { if (((listLiveNodes && !isDead) || (listDeadNodes && isDead) || (listDecommissioningNodes && isDecommissioning)) && - hostFileManager.isIncluded(dn)) { + hostConfigManager.isIncluded(dn)) { nodes.add(dn); } - foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn)); + foundNodes.add(dn.getResolvedAddress()); } } Collections.sort(nodes); @@ -1301,7 +1314,7 @@ public class DatanodeManager { addr.getPort() == 0 ? defaultXferPort : addr.getPort(), defaultInfoPort, defaultInfoSecurePort, defaultIpcPort)); setDatanodeDead(dn); - if (excludedNodes.match(addr)) { + if (hostConfigManager.isExcluded(dn)) { dn.setDecommissioned(); } nodes.add(dn); @@ -1310,8 +1323,8 @@ public class DatanodeManager { if (LOG.isDebugEnabled()) { LOG.debug("getDatanodeListForReport with " + - "includedNodes = " + hostFileManager.getIncludes() + - ", excludedNodes = " + hostFileManager.getExcludes() + + "includedNodes = " + hostConfigManager.getIncludes() + + ", excludedNodes = " + hostConfigManager.getExcludes() + ", foundNodes = " + foundNodes + ", nodes = " + nodes); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java new file mode 100644 index 00000000000..f28ed2997a1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hdfs.protocol.DatanodeID; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * This interface abstracts how datanode configuration is managed. + * + * Each implementation defines its own way to persist the configuration. + * For example, it can use one JSON file to store the configs for all + * datanodes; or it can use one file to store in-service datanodes and another + * file to store decommission-requested datanodes. + * + * These files control which DataNodes the NameNode expects to see in the + * cluster. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class HostConfigManager implements Configurable { + + /** + * Return all the datanodes that are allowed to connect to the namenode. + * @return Iterable of all datanodes + */ + public abstract Iterable getIncludes(); + + /** + * Return all datanodes that should be in decommissioned state. + * @return Iterable of those datanodes + */ + public abstract Iterable getExcludes(); + + /** + * Check if a datanode is allowed to connect the namenode. + * @param dn the DatanodeID of the datanode + * @return boolean if dn is allowed to connect the namenode. + */ + public abstract boolean isIncluded(DatanodeID dn); + + /** + * Check if a datanode needs to be decommissioned. + * @param dn the DatanodeID of the datanode + * @return boolean if dn needs to be decommissioned. + */ + public abstract boolean isExcluded(DatanodeID dn); + + /** + * Reload the configuration. + */ + public abstract void refresh() throws IOException; + + /** + * Get the upgrade domain of a datanode. + * @param dn the DatanodeID of the datanode + * @return the upgrade domain of dn. + */ + public abstract String getUpgradeDomain(DatanodeID dn); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java index e05ef9a4047..bcfebf25de1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java @@ -18,28 +18,18 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterators; -import com.google.common.collect.Multimap; -import com.google.common.collect.UnmodifiableIterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.util.HostsFileReader; -import javax.annotation.Nullable; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.util.Collection; import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; /** * This class manages the include and exclude files for HDFS. @@ -59,11 +49,27 @@ import java.util.Map; * of DNs when it fails to do a forward and reverse lookup. Note that DNS * resolutions are only done during the loading time to minimize the latency. */ -class HostFileManager { +public class HostFileManager extends HostConfigManager { private static final Log LOG = LogFactory.getLog(HostFileManager.class); + private Configuration conf; private HostSet includes = new HostSet(); private HostSet excludes = new HostSet(); + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void refresh() throws IOException { + refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""), + conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, "")); + } private static HostSet readFile(String type, String filename) throws IOException { HostSet res = new HostSet(); @@ -99,31 +105,37 @@ class HostFileManager { return null; } - static InetSocketAddress resolvedAddressFromDatanodeID(DatanodeID id) { - return new InetSocketAddress(id.getIpAddr(), id.getXferPort()); - } - - synchronized HostSet getIncludes() { + @Override + public synchronized HostSet getIncludes() { return includes; } - synchronized HostSet getExcludes() { + @Override + public synchronized HostSet getExcludes() { return excludes; } // If the includes list is empty, act as if everything is in the // includes list. - synchronized boolean isIncluded(DatanodeID dn) { - return includes.isEmpty() || includes.match - (resolvedAddressFromDatanodeID(dn)); + @Override + public synchronized boolean isIncluded(DatanodeID dn) { + return includes.isEmpty() || includes.match(dn.getResolvedAddress()); } - synchronized boolean isExcluded(DatanodeID dn) { - return excludes.match(resolvedAddressFromDatanodeID(dn)); + @Override + public synchronized boolean isExcluded(DatanodeID dn) { + return isExcluded(dn.getResolvedAddress()); } - synchronized boolean hasIncludes() { - return !includes.isEmpty(); + private boolean isExcluded(InetSocketAddress address) { + return excludes.match(address); + } + + @Override + public synchronized String getUpgradeDomain(final DatanodeID dn) { + // The include/exclude files based config doesn't support upgrade domain + // config. + return null; } /** @@ -133,7 +145,8 @@ class HostFileManager { * @param excludeFile the path to the new excludes list * @throws IOException thrown if there is a problem reading one of the files */ - void refresh(String includeFile, String excludeFile) throws IOException { + private void refresh(String includeFile, String excludeFile) + throws IOException { HostSet newIncludes = readFile("included", includeFile); HostSet newExcludes = readFile("excluded", excludeFile); @@ -153,84 +166,4 @@ class HostFileManager { excludes = newExcludes; } } - - /** - * The HostSet allows efficient queries on matching wildcard addresses. - *

- * For InetSocketAddress A and B with the same host address, - * we define a partial order between A and B, A <= B iff A.getPort() == B - * .getPort() || B.getPort() == 0. - */ - static class HostSet implements Iterable { - // Host -> lists of ports - private final Multimap addrs = HashMultimap.create(); - - /** - * The function that checks whether there exists an entry foo in the set - * so that foo <= addr. - */ - boolean matchedBy(InetSocketAddress addr) { - Collection ports = addrs.get(addr.getAddress()); - return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr - .getPort()); - } - - /** - * The function that checks whether there exists an entry foo in the set - * so that addr <= foo. - */ - boolean match(InetSocketAddress addr) { - int port = addr.getPort(); - Collection ports = addrs.get(addr.getAddress()); - boolean exactMatch = ports.contains(port); - boolean genericMatch = ports.contains(0); - return exactMatch || genericMatch; - } - - boolean isEmpty() { - return addrs.isEmpty(); - } - - int size() { - return addrs.size(); - } - - void add(InetSocketAddress addr) { - Preconditions.checkArgument(!addr.isUnresolved()); - addrs.put(addr.getAddress(), addr.getPort()); - } - - @Override - public Iterator iterator() { - return new UnmodifiableIterator() { - private final Iterator> it = addrs.entries().iterator(); - - @Override - public boolean hasNext() { - return it.hasNext(); - } - - @Override - public InetSocketAddress next() { - Map.Entry e = it.next(); - return new InetSocketAddress(e.getKey(), e.getValue()); - } - }; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder("HostSet("); - Joiner.on(",").appendTo(sb, Iterators.transform(iterator(), - new Function() { - @Override - public String apply(@Nullable InetSocketAddress addr) { - assert addr != null; - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); - } - })); - return sb.append(")").toString(); - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java new file mode 100644 index 00000000000..958557b4f8a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; +import com.google.common.collect.UnmodifiableIterator; + +import javax.annotation.Nullable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + + +/** + * The HostSet allows efficient queries on matching wildcard addresses. + *

+ * For InetSocketAddress A and B with the same host address, + * we define a partial order between A and B, A <= B iff A.getPort() == B + * .getPort() || B.getPort() == 0. + */ +public class HostSet implements Iterable { + // Host -> lists of ports + private final Multimap addrs = HashMultimap.create(); + + /** + * The function that checks whether there exists an entry foo in the set + * so that foo <= addr. + */ + boolean matchedBy(InetSocketAddress addr) { + Collection ports = addrs.get(addr.getAddress()); + return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr + .getPort()); + } + + /** + * The function that checks whether there exists an entry foo in the set + * so that addr <= foo. + */ + boolean match(InetSocketAddress addr) { + int port = addr.getPort(); + Collection ports = addrs.get(addr.getAddress()); + boolean exactMatch = ports.contains(port); + boolean genericMatch = ports.contains(0); + return exactMatch || genericMatch; + } + + boolean isEmpty() { + return addrs.isEmpty(); + } + + int size() { + return addrs.size(); + } + + void add(InetSocketAddress addr) { + Preconditions.checkArgument(!addr.isUnresolved()); + addrs.put(addr.getAddress(), addr.getPort()); + } + + @Override + public Iterator iterator() { + return new UnmodifiableIterator() { + private final Iterator> it = addrs.entries().iterator(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public InetSocketAddress next() { + Map.Entry e = it.next(); + return new InetSocketAddress(e.getKey(), e.getValue()); + } + }; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("HostSet("); + Joiner.on(",").appendTo(sb, Iterators.transform(iterator(), + new Function() { + @Override + public String apply(@Nullable InetSocketAddress addr) { + assert addr != null; + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + })); + return sb.append(")").toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 50d09c0b183..8ab785f440a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3107,4 +3107,19 @@ The size buffer to be used when creating or opening httpfs filesystem IO stream. + + + dfs.namenode.hosts.provider.classname + org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager + + The class that provides access for host files. + org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager is used + by default which loads files specified by dfs.hosts and dfs.hosts.exclude. + If org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager is + used, it will load the JSON file defined in dfs.hosts. + To change class name, nn restart is required. "dfsadmin -refreshNodes" only + refreshes the configuration files used by the class. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md index 9c6bd026ed3..28957fbf602 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md @@ -142,12 +142,16 @@ The `bin/hdfs dfsadmin` command supports a few HDFS administration related opera during last upgrade. * `-refreshNodes`: Updates the namenode with the set of datanodes - allowed to connect to the namenode. Namenodes re-read datanode + allowed to connect to the namenode. By default, Namenodes re-read datanode hostnames in the file defined by `dfs.hosts`, `dfs.hosts.exclude` Hosts defined in `dfs.hosts` are the datanodes that are part of the cluster. If there are entries in `dfs.hosts`, only the hosts in it are allowed to register with the namenode. Entries in `dfs.hosts.exclude` are datanodes that need to be decommissioned. + Alternatively if `dfs.namenode.hosts.provider.classname` is set to + `org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager`, + all include and exclude hosts are specified in the JSON file defined by + `dfs.hosts`. Datanodes complete decommissioning when all the replicas from them are replicated to other datanodes. Decommissioned nodes are not automatically shutdown and are not chosen for writing for new diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java index 81382985964..c990e0fe76c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java @@ -29,15 +29,19 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.junit.Assert; import org.junit.Test; @@ -48,7 +52,57 @@ public class TestDatanodeReport { static final Log LOG = LogFactory.getLog(TestDatanodeReport.class); final static private Configuration conf = new HdfsConfiguration(); final static private int NUM_OF_DATANODES = 4; - + + /** + * This test verifies upgrade domain is set according to the JSON host file. + */ + @Test + public void testDatanodeReportWithUpgradeDomain() throws Exception { + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); // 0.5s + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + CombinedHostFileManager.class, HostConfigManager.class); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/datanodeReport"); + + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + final DFSClient client = cluster.getFileSystem().dfs; + final String ud1 = "ud1"; + final String ud2 = "ud2"; + + try { + //wait until the cluster is up + cluster.waitActive(); + + DatanodeAdminProperties datanode = new DatanodeAdminProperties(); + datanode.setHostName(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); + datanode.setUpgradeDomain(ud1); + hostsFileWriter.initIncludeHosts( + new DatanodeAdminProperties[]{datanode}); + client.refreshNodes(); + DatanodeInfo[] all = client.datanodeReport(DatanodeReportType.ALL); + assertEquals(all[0].getUpgradeDomain(), ud1); + + datanode.setUpgradeDomain(null); + hostsFileWriter.initIncludeHosts( + new DatanodeAdminProperties[]{datanode}); + client.refreshNodes(); + all = client.datanodeReport(DatanodeReportType.ALL); + assertEquals(all[0].getUpgradeDomain(), null); + + datanode.setUpgradeDomain(ud2); + hostsFileWriter.initIncludeHosts( + new DatanodeAdminProperties[]{datanode}); + client.refreshNodes(); + all = client.datanodeReport(DatanodeReportType.ALL); + assertEquals(all[0].getUpgradeDomain(), ud2); + } finally { + cluster.shutdown(); + } + } + /** * This test attempts to different types of datanode report. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java index 3d00e269c80..0b403d1b3a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.Test; @@ -384,17 +385,8 @@ public class TestBlocksWithNotEnoughRacks { short REPLICATION_FACTOR = 2; final Path filePath = new Path("/testFile"); - // Configure an excludes file - FileSystem localFileSys = FileSystem.getLocal(conf); - Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); - Path dir = new Path(workingDir, "temp/decommission"); - Path excludeFile = new Path(dir, "exclude"); - Path includeFile = new Path(dir, "include"); - assertTrue(localFileSys.mkdirs(dir)); - DFSTestUtil.writeFile(localFileSys, excludeFile, ""); - DFSTestUtil.writeFile(localFileSys, includeFile, ""); - conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); - conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/decommission"); // Two blocks and four racks String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"}; @@ -415,7 +407,7 @@ public class TestBlocksWithNotEnoughRacks { BlockLocation locs[] = fs.getFileBlockLocations( fs.getFileStatus(filePath), 0, Long.MAX_VALUE); String name = locs[0].getNames()[0]; - DFSTestUtil.writeFile(localFileSys, excludeFile, name); + hostsFileWriter.initExcludeHost(name); ns.getBlockManager().getDatanodeManager().refreshNodes(conf); DFSTestUtil.waitForDecommission(fs, name); @@ -423,6 +415,7 @@ public class TestBlocksWithNotEnoughRacks { DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0); } finally { cluster.shutdown(); + hostsFileWriter.cleanup(); } } @@ -437,17 +430,8 @@ public class TestBlocksWithNotEnoughRacks { short REPLICATION_FACTOR = 5; final Path filePath = new Path("/testFile"); - // Configure an excludes file - FileSystem localFileSys = FileSystem.getLocal(conf); - Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); - Path dir = new Path(workingDir, "temp/decommission"); - Path excludeFile = new Path(dir, "exclude"); - Path includeFile = new Path(dir, "include"); - assertTrue(localFileSys.mkdirs(dir)); - DFSTestUtil.writeFile(localFileSys, excludeFile, ""); - DFSTestUtil.writeFile(localFileSys, includeFile, ""); - conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); - conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/decommission"); // All hosts are on two racks, only one host on /rack2 String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"}; @@ -473,7 +457,7 @@ public class TestBlocksWithNotEnoughRacks { for (String top : locs[0].getTopologyPaths()) { if (!top.startsWith("/rack2")) { String name = top.substring("/rack1".length()+1); - DFSTestUtil.writeFile(localFileSys, excludeFile, name); + hostsFileWriter.initExcludeHost(name); ns.getBlockManager().getDatanodeManager().refreshNodes(conf); DFSTestUtil.waitForDecommission(fs, name); break; @@ -485,6 +469,7 @@ public class TestBlocksWithNotEnoughRacks { DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0); } finally { cluster.shutdown(); + hostsFileWriter.cleanup(); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index a8a1db900fe..30e2aaf7fd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -420,9 +420,9 @@ public class TestDatanodeManager { DatanodeManager dm = mockDatanodeManager(fsn, new Configuration()); HostFileManager hm = new HostFileManager(); - HostFileManager.HostSet noNodes = new HostFileManager.HostSet(); - HostFileManager.HostSet oneNode = new HostFileManager.HostSet(); - HostFileManager.HostSet twoNodes = new HostFileManager.HostSet(); + HostSet noNodes = new HostSet(); + HostSet oneNode = new HostSet(); + HostSet twoNodes = new HostSet(); DatanodeRegistration dr1 = new DatanodeRegistration( new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123", 12345, 12345, 12345, 12345), @@ -439,7 +439,7 @@ public class TestDatanodeManager { oneNode.add(entry("127.0.0.1:23456")); hm.refresh(twoNodes, noNodes); - Whitebox.setInternalState(dm, "hostFileManager", hm); + Whitebox.setInternalState(dm, "hostConfigManager", hm); // Register two data nodes to simulate them coming up. // We need to add two nodes, because if we have only one node, removing it diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java index 6f17040ef68..e6be7cb1727 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java @@ -40,7 +40,7 @@ public class TestHostFileManager { @Test public void testDeduplication() { - HostFileManager.HostSet s = new HostFileManager.HostSet(); + HostSet s = new HostSet(); // These entries will be de-duped, since they refer to the same IP // address + port combo. s.add(entry("127.0.0.1:12345")); @@ -60,7 +60,7 @@ public class TestHostFileManager { @Test public void testRelation() { - HostFileManager.HostSet s = new HostFileManager.HostSet(); + HostSet s = new HostSet(); s.add(entry("127.0.0.1:123")); Assert.assertTrue(s.match(entry("127.0.0.1:123"))); Assert.assertFalse(s.match(entry("127.0.0.1:12"))); @@ -105,8 +105,8 @@ public class TestHostFileManager { FSNamesystem fsn = mock(FSNamesystem.class); Configuration conf = new Configuration(); HostFileManager hm = new HostFileManager(); - HostFileManager.HostSet includedNodes = new HostFileManager.HostSet(); - HostFileManager.HostSet excludedNodes = new HostFileManager.HostSet(); + HostSet includedNodes = new HostSet(); + HostSet excludedNodes = new HostSet(); includedNodes.add(entry("127.0.0.1:12345")); includedNodes.add(entry("localhost:12345")); @@ -122,7 +122,7 @@ public class TestHostFileManager { hm.refresh(includedNodes, excludedNodes); DatanodeManager dm = new DatanodeManager(bm, fsn, conf); - Whitebox.setInternalState(dm, "hostFileManager", hm); + Whitebox.setInternalState(dm, "hostConfigManager", hm); Map dnMap = (Map) Whitebox.getInternalState(dm, "datanodeMap"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java index bf4d8ff1889..0ac968a1175 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java @@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.junit.Assert.assertTrue; import java.lang.management.ManagementFactory; -import java.io.File; +import java.util.Arrays; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; @@ -37,15 +36,33 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager; +import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * DFS_HOSTS and DFS_HOSTS_EXCLUDE tests * */ +@RunWith(Parameterized.class) public class TestHostsFiles { private static final Log LOG = LogFactory.getLog(TestHostsFiles.class.getName()); + private Class hostFileMgrClass; + + public TestHostsFiles(Class hostFileMgrClass) { + this.hostFileMgrClass = hostFileMgrClass; + } + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][]{ + {HostFileManager.class}, {CombinedHostFileManager.class}}); + } /* * Return a configuration object with low timeouts for testing and @@ -72,6 +89,10 @@ public class TestHostsFiles { // Indicates we have multiple racks conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz"); + + // Host file manager + conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + hostFileMgrClass, HostConfigManager.class); return conf; } @@ -80,18 +101,8 @@ public class TestHostsFiles { Configuration conf = getConf(); short REPLICATION_FACTOR = 2; final Path filePath = new Path("/testFile"); - - // Configure an excludes file - FileSystem localFileSys = FileSystem.getLocal(conf); - Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); - Path dir = new Path(workingDir, "temp/decommission"); - Path excludeFile = new Path(dir, "exclude"); - Path includeFile = new Path(dir, "include"); - assertTrue(localFileSys.mkdirs(dir)); - DFSTestUtil.writeFile(localFileSys, excludeFile, ""); - DFSTestUtil.writeFile(localFileSys, includeFile, ""); - conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); - conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/decommission"); // Two blocks and four racks String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"}; @@ -112,9 +123,8 @@ public class TestHostsFiles { BlockLocation locs[] = fs.getFileBlockLocations( fs.getFileStatus(filePath), 0, Long.MAX_VALUE); String name = locs[0].getNames()[0]; - String names = name + "\n" + "localhost:42\n"; - LOG.info("adding '" + names + "' to exclude file " + excludeFile.toUri().getPath()); - DFSTestUtil.writeFile(localFileSys, excludeFile, name); + LOG.info("adding '" + name + "' to decommission"); + hostsFileWriter.initExcludeHost(name); ns.getBlockManager().getDatanodeManager().refreshNodes(conf); DFSTestUtil.waitForDecommission(fs, name); @@ -131,9 +141,7 @@ public class TestHostsFiles { if (cluster != null) { cluster.shutdown(); } - if (localFileSys.exists(dir)) { - FileUtils.deleteQuietly(new File(dir.toUri().getPath())); - } + hostsFileWriter.cleanup(); } } @@ -141,20 +149,10 @@ public class TestHostsFiles { public void testHostsIncludeForDeadCount() throws Exception { Configuration conf = getConf(); - // Configure an excludes file - FileSystem localFileSys = FileSystem.getLocal(conf); - Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); - Path dir = new Path(workingDir, "temp/decommission"); - Path excludeFile = new Path(dir, "exclude"); - Path includeFile = new Path(dir, "include"); - assertTrue(localFileSys.mkdirs(dir)); - StringBuilder includeHosts = new StringBuilder(); - includeHosts.append("localhost:52").append("\n").append("127.0.0.1:7777") - .append("\n"); - DFSTestUtil.writeFile(localFileSys, excludeFile, ""); - DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString()); - conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); - conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/decommission"); + hostsFileWriter.initIncludeHosts(new String[] + {"localhost:52","127.0.0.1:7777"}); MiniDFSCluster cluster = null; try { @@ -174,9 +172,7 @@ public class TestHostsFiles { if (cluster != null) { cluster.shutdown(); } - if (localFileSys.exists(dir)) { - FileUtils.deleteQuietly(new File(dir.toUri().getPath())); - } + hostsFileWriter.cleanup(); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java index e622576745b..c19b9cfa102 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; +import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; import org.apache.hadoop.net.ServerSocketUtil; @@ -44,10 +45,10 @@ import org.mortbay.util.ajax.JSON; import javax.management.MBeanServer; import javax.management.ObjectName; import java.io.File; -import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.BindException; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -238,8 +239,8 @@ public class TestNameNodeMXBean { conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); MiniDFSCluster cluster = null; - FileSystem localFileSys = null; - Path dir = null; + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/TestNameNodeMXBean"); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); @@ -251,18 +252,12 @@ public class TestNameNodeMXBean { ObjectName mxbeanName = new ObjectName( "Hadoop:service=NameNode,name=NameNodeInfo"); - // Define include file to generate deadNodes metrics - localFileSys = FileSystem.getLocal(conf); - Path workingDir = localFileSys.getWorkingDirectory(); - dir = new Path(workingDir,"build/test/data/temp/TestNameNodeMXBean"); - Path includeFile = new Path(dir, "include"); - assertTrue(localFileSys.mkdirs(dir)); - StringBuilder includeHosts = new StringBuilder(); + List hosts = new ArrayList<>(); for(DataNode dn : cluster.getDataNodes()) { - includeHosts.append(dn.getDisplayName()).append("\n"); + hosts.add(dn.getDisplayName()); } - DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString()); - conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); + hostsFileWriter.initIncludeHosts(hosts.toArray( + new String[hosts.size()])); fsn.getBlockManager().getDatanodeManager().refreshNodes(conf); cluster.stopDataNode(0); @@ -284,12 +279,10 @@ public class TestNameNodeMXBean { assertTrue(deadNode.containsKey("xferaddr")); } } finally { - if ((localFileSys != null) && localFileSys.exists(dir)) { - FileUtils.deleteQuietly(new File(dir.toUri().getPath())); - } if (cluster != null) { cluster.shutdown(); } + hostsFileWriter.cleanup(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java index 978eef8df0a..2fe25e0e716 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.URI; -import java.util.ArrayList; import java.nio.file.Paths; import java.util.Collection; import java.util.Iterator; @@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.test.GenericTestUtils; @@ -590,27 +590,15 @@ public class TestStartup { @Test public void testNNRestart() throws IOException, InterruptedException { MiniDFSCluster cluster = null; - FileSystem localFileSys; - Path hostsFile; - Path excludeFile; int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds - // Set up the hosts/exclude files. - localFileSys = FileSystem.getLocal(config); - Path workingDir = localFileSys.getWorkingDirectory(); - Path dir = new Path(workingDir, "build/test/data/work-dir/restartnn"); - hostsFile = new Path(dir, "hosts"); - excludeFile = new Path(dir, "exclude"); - // Setup conf - config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); - writeConfigFile(localFileSys, excludeFile, null); - config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath()); - // write into hosts file - ArrayListlist = new ArrayList(); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(config, "work-dir/restartnn"); + byte b[] = {127, 0, 0, 1}; InetAddress inetAddress = InetAddress.getByAddress(b); - list.add(inetAddress.getHostName()); - writeConfigFile(localFileSys, hostsFile, list); + hostsFileWriter.initIncludeHosts(new String[] {inetAddress.getHostName()}); + int numDatanodes = 1; try { @@ -635,37 +623,12 @@ public class TestStartup { fail(StringUtils.stringifyException(e)); throw e; } finally { - cleanupFile(localFileSys, excludeFile.getParent()); if (cluster != null) { cluster.shutdown(); } + hostsFileWriter.cleanup(); } } - - private void writeConfigFile(FileSystem localFileSys, Path name, - ArrayList nodes) throws IOException { - // delete if it already exists - if (localFileSys.exists(name)) { - localFileSys.delete(name, true); - } - - FSDataOutputStream stm = localFileSys.create(name); - if (nodes != null) { - for (Iterator it = nodes.iterator(); it.hasNext();) { - String node = it.next(); - stm.writeBytes(node); - stm.writeBytes("\n"); - } - } - stm.close(); - } - - private void cleanupFile(FileSystem fileSys, Path name) throws IOException { - assertTrue(fileSys.exists(name)); - fileSys.delete(name, true); - assertTrue(!fileSys.exists(name)); - } - @Test(timeout = 120000) public void testXattrConfiguration() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java new file mode 100644 index 00000000000..f9a2503b004 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; +import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.util.HostsFileWriter; +import org.apache.hadoop.net.StaticMapping; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * End-to-end test case for upgrade domain + * The test configs upgrade domain for nodes via admin json + * config file and put some nodes to decommission state. + * The test then verifies replicas are placed on the nodes that + * satisfy the upgrade domain policy. + * + */ +public class TestUpgradeDomainBlockPlacementPolicy { + + private static final short REPLICATION_FACTOR = (short) 3; + private static final int DEFAULT_BLOCK_SIZE = 1024; + static final String[] racks = + { "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" }; + /** + * Use host names that can be resolved ( + * InetSocketAddress#isUnresolved == false). Otherwise, + * CombinedHostFileManager won't allow those hosts. + */ + static final String[] hosts = + { "127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1", + "127.0.0.1", "127.0.0.1" }; + static final String[] upgradeDomains = + { "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" }; + static final Set expectedDatanodeIDs = new HashSet<>(); + private MiniDFSCluster cluster = null; + private NamenodeProtocols nameNodeRpc = null; + private FSNamesystem namesystem = null; + private PermissionStatus perm = null; + + @Before + public void setup() throws IOException { + StaticMapping.resetMap(); + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyWithUpgradeDomain.class, + BlockPlacementPolicy.class); + conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + CombinedHostFileManager.class, HostConfigManager.class); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy"); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks) + .hosts(hosts).build(); + cluster.waitActive(); + nameNodeRpc = cluster.getNameNodeRpc(); + namesystem = cluster.getNamesystem(); + perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null, + FsPermission.getDefault()); + refreshDatanodeAdminProperties(hostsFileWriter); + hostsFileWriter.cleanup(); + } + + @After + public void teardown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + /** + * Define admin properties for these datanodes as follows. + * dn0 and dn3 have upgrade domain ud1. + * dn1 and dn4 have upgrade domain ud2. + * dn2 and dn5 have upgrade domain ud3. + * dn0 and dn5 are decommissioned. + * Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on + * rack2. Then any block's replicas should be on either + * {dn1, dn2, d3} or {dn2, dn3, dn4}. + */ + private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter) + throws IOException { + DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[ + hosts.length]; + for (int i = 0; i < hosts.length; i++) { + datanodes[i] = new DatanodeAdminProperties(); + DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId(); + datanodes[i].setHostName(datanodeID.getHostName()); + datanodes[i].setPort(datanodeID.getXferPort()); + datanodes[i].setUpgradeDomain(upgradeDomains[i]); + } + datanodes[0].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED); + datanodes[5].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED); + hostsFileWriter.initIncludeHosts(datanodes); + cluster.getFileSystem().refreshNodes(); + + expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId()); + expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId()); + } + + @Test + public void testPlacement() throws Exception { + String clientMachine = "127.0.0.1"; + for (int i = 0; i < 5; i++) { + String src = "/test-" + i; + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, + clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, + REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, + null, null, fileStatus.getFileId(), null, null); + + assertEquals("Block should be allocated sufficient locations", + REPLICATION_FACTOR, locatedBlock.getLocations().length); + Set locs = new HashSet<>(Arrays.asList( + locatedBlock.getLocations())); + for (DatanodeID datanodeID : expectedDatanodeIDs) { + locs.contains(datanodeID); + } + + nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), + src, clientMachine); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java new file mode 100644 index 00000000000..cd5ae954971 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; + +import static org.junit.Assert.assertTrue; + +public class HostsFileWriter { + private FileSystem localFileSys; + private Path fullDir; + private Path excludeFile; + private Path includeFile; + private Path combinedFile; + private boolean isLegacyHostsFile = false; + + public void initialize(Configuration conf, String dir) throws IOException { + localFileSys = FileSystem.getLocal(conf); + Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); + this.fullDir = new Path(workingDir, dir); + assertTrue(localFileSys.mkdirs(this.fullDir)); + + if (conf.getClass( + DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + HostFileManager.class, HostConfigManager.class).equals( + HostFileManager.class)) { + isLegacyHostsFile = true; + } + if (isLegacyHostsFile) { + excludeFile = new Path(fullDir, "exclude"); + includeFile = new Path(fullDir, "include"); + DFSTestUtil.writeFile(localFileSys, excludeFile, ""); + DFSTestUtil.writeFile(localFileSys, includeFile, ""); + conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); + conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); + } else { + combinedFile = new Path(fullDir, "all"); + conf.set(DFSConfigKeys.DFS_HOSTS, combinedFile.toString()); + } + } + + public void initExcludeHost(String hostNameAndPort) throws IOException { + if (isLegacyHostsFile) { + DFSTestUtil.writeFile(localFileSys, excludeFile, hostNameAndPort); + } else { + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + String [] hostAndPort = hostNameAndPort.split(":"); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + dn.setAdminState(AdminStates.DECOMMISSIONED); + HashSet allDNs = new HashSet<>(); + allDNs.add(dn); + CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs); + } + } + + public void initIncludeHosts(String[] hostNameAndPorts) throws IOException { + StringBuilder includeHosts = new StringBuilder(); + if (isLegacyHostsFile) { + for(String hostNameAndPort : hostNameAndPorts) { + includeHosts.append(hostNameAndPort).append("\n"); + } + DFSTestUtil.writeFile(localFileSys, includeFile, + includeHosts.toString()); + } else { + HashSet allDNs = new HashSet<>(); + for(String hostNameAndPort : hostNameAndPorts) { + String[] hostAndPort = hostNameAndPort.split(":"); + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + allDNs.add(dn); + } + CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs); + } + } + + public void initIncludeHosts(DatanodeAdminProperties[] datanodes) + throws IOException { + CombinedHostsFileWriter.writeFile(combinedFile.toString(), + new HashSet<>(Arrays.asList(datanodes))); + } + + public void cleanup() throws IOException { + if (localFileSys.exists(fullDir)) { + FileUtils.deleteQuietly(new File(fullDir.toUri().getPath())); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java new file mode 100644 index 00000000000..c3946e412b2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.File; +import java.io.FileWriter; + +import java.util.Set; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/* + * Test for JSON based HostsFileReader + */ +public class TestCombinedHostsFileReader { + + // Using /test/build/data/tmp directory to store temporary files + static final String HOSTS_TEST_DIR = new File(System.getProperty( + "test.build.data", "/tmp")).getAbsolutePath(); + File NEW_FILE = new File(HOSTS_TEST_DIR, "dfs.hosts.new.json"); + + static final String TEST_CACHE_DATA_DIR = + System.getProperty("test.cache.data", "build/test/cache"); + File EXISTING_FILE = new File(TEST_CACHE_DATA_DIR, "dfs.hosts.json"); + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + // Delete test file after running tests + NEW_FILE.delete(); + + } + + /* + * Load the existing test json file + */ + @Test + public void testLoadExistingJsonFile() throws Exception { + Set all = + CombinedHostsFileReader.readFile(EXISTING_FILE.getAbsolutePath()); + assertEquals(5, all.size()); + } + + /* + * Test empty json config file + */ + @Test + public void testEmptyCombinedHostsFileReader() throws Exception { + FileWriter hosts = new FileWriter(NEW_FILE); + hosts.write(""); + hosts.close(); + Set all = + CombinedHostsFileReader.readFile(NEW_FILE.getAbsolutePath()); + assertEquals(0, all.size()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json new file mode 100644 index 00000000000..64fca48dbff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json @@ -0,0 +1,5 @@ +{"hostName": "host1"} +{"hostName": "host2", "upgradeDomain": "ud0"} +{"hostName": "host3", "adminState": "DECOMMISSIONED"} +{"hostName": "host4", "upgradeDomain": "ud2", "adminState": "DECOMMISSIONED"} +{"hostName": "host5", "port": 8090}