diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 93437e31eb3..3e778ee617b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2555,6 +2555,14 @@ public class YarnConfiguration extends Configuration { SHARED_CACHE_PREFIX + "nm.uploader.thread-count"; public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20; + //////////////////////////////// + // Federation Configs + //////////////////////////////// + + public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation."; + public static final String FEDERATION_MACHINE_LIST = + FEDERATION_PREFIX + "machine-list"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7ddcfcd969e..e20aad5a955 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2688,6 +2688,13 @@ + + + Machine list file to be loaded by the FederationSubCluster Resolver + + yarn.federation.machine-list + + The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 18492ce2994..89dec303a47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -171,6 +171,16 @@ + + org.apache.rat + apache-rat-plugin + + + src/test/resources/nodes + src/test/resources/nodes-malformed + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java new file mode 100644 index 00000000000..8238633f354 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java @@ -0,0 +1,67 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.federation.resolver; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +import java.util.HashMap; +import java.util.Set; +import java.util.Map; + +/** + * Partial implementation of {@link SubClusterResolver}, containing basic + * implementations of the read methods. + */ +public abstract class AbstractSubClusterResolver implements SubClusterResolver { + private Map nodeToSubCluster = + new HashMap(); + private Map> rackToSubClusters = + new HashMap>(); + + @Override + public SubClusterId getSubClusterForNode(String nodename) + throws YarnException { + SubClusterId subClusterId = this.nodeToSubCluster.get(nodename); + + if (subClusterId == null) { + throw new YarnException("Cannot find subClusterId for node " + nodename); + } + + return subClusterId; + } + + @Override + public Set getSubClustersForRack(String rackname) + throws YarnException { + if (!rackToSubClusters.containsKey(rackname)) { + throw new YarnException("Cannot resolve rack " + rackname); + } + + return rackToSubClusters.get(rackname); + } + + protected Map getNodeToSubCluster() { + return nodeToSubCluster; + } + + protected Map> getRackToSubClusters() { + return rackToSubClusters; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/DefaultSubClusterResolverImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/DefaultSubClusterResolverImpl.java new file mode 100644 index 00000000000..d3c5c269abb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/DefaultSubClusterResolverImpl.java @@ -0,0 +1,164 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.federation.resolver; + +import java.io.BufferedReader; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * Default simple sub-cluster and rack resolver class. + * + * This class expects a three-column comma separated file, specified in + * yarn.federation.machine-list. Each line of the file should be of the format: + * + * nodeName, subClusterId, rackName + * + * Lines that do not follow this format will be ignored. This resolver only + * loads the file when load() is explicitly called; it will not react to changes + * to the file. + * + * It is case-insensitive on the rack and node names and ignores + * leading/trailing whitespace. + * + */ +public class DefaultSubClusterResolverImpl extends AbstractSubClusterResolver + implements SubClusterResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(DefaultSubClusterResolverImpl.class); + private Configuration conf; + + // Index of the node hostname in the machine info file. + private static final int NODE_NAME_INDEX = 0; + + // Index of the sub-cluster ID in the machine info file. + private static final int SUBCLUSTER_ID_INDEX = 1; + + // Index of the rack name ID in the machine info file. + private static final int RACK_NAME_INDEX = 2; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public SubClusterId getSubClusterForNode(String nodename) + throws YarnException { + return super.getSubClusterForNode(nodename.toUpperCase()); + } + + @Override + public void load() { + String fileName = + this.conf.get(YarnConfiguration.FEDERATION_MACHINE_LIST, ""); + + try { + if (fileName == null || fileName.trim().length() == 0) { + LOG.info( + "The machine list file path is not specified in the configuration"); + return; + } + + Path file = null; + BufferedReader reader = null; + + try { + file = Paths.get(fileName); + } catch (InvalidPathException e) { + LOG.info("The configured machine list file path {} does not exist", + fileName); + return; + } + + try { + reader = Files.newBufferedReader(file, Charset.defaultCharset()); + String line = null; + while ((line = reader.readLine()) != null) { + String[] tokens = line.split(","); + if (tokens.length == 3) { + + String nodeName = tokens[NODE_NAME_INDEX].trim().toUpperCase(); + SubClusterId subClusterId = + SubClusterId.newInstance(tokens[SUBCLUSTER_ID_INDEX].trim()); + String rackName = tokens[RACK_NAME_INDEX].trim().toUpperCase(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loading node into resolver: {} --> {}", nodeName, + subClusterId); + LOG.debug("Loading rack into resolver: {} --> {} ", rackName, + subClusterId); + } + + this.getNodeToSubCluster().put(nodeName, subClusterId); + loadRackToSubCluster(rackName, subClusterId); + } else { + LOG.warn("Skipping malformed line in machine list: " + line); + } + } + } finally { + if (reader != null) { + reader.close(); + } + } + LOG.info("Successfully loaded file {}", fileName); + + } catch (Exception e) { + LOG.error("Failed to parse file " + fileName, e); + } + } + + private void loadRackToSubCluster(String rackName, + SubClusterId subClusterId) { + String rackNameUpper = rackName.toUpperCase(); + + if (!this.getRackToSubClusters().containsKey(rackNameUpper)) { + this.getRackToSubClusters().put(rackNameUpper, + new HashSet()); + } + + this.getRackToSubClusters().get(rackNameUpper).add(subClusterId); + + } + + @Override + public Set getSubClustersForRack(String rackname) + throws YarnException { + return super.getSubClustersForRack(rackname.toUpperCase()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java new file mode 100644 index 00000000000..c6adfa6cdf0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java @@ -0,0 +1,58 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.federation.resolver; + +import java.util.Set; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +/** + * An utility that helps to determine the sub-cluster that a specified node + * belongs to. + */ +public interface SubClusterResolver extends Configurable { + + /** + * Obtain the sub-cluster that a specified node belongs to. + * + * @param nodename the node whose sub-cluster is to be determined + * @return the sub-cluster as identified by the {@link SubClusterId} that the + * node belongs to + * @throws YarnException if the node's sub-cluster cannot be resolved + */ + SubClusterId getSubClusterForNode(String nodename) throws YarnException; + + /** + * Obtain the sub-clusters that have nodes on a specified rack. + * + * @param rackname the name of the rack + * @return the sub-clusters as identified by the {@link SubClusterId} that + * have nodes on the given rack + * @throws YarnException if the sub-cluster of any node on the rack cannot be + * resolved, or if the rack name is not recognized + */ + Set getSubClustersForRack(String rackname) throws YarnException; + + /** + * Load the nodes to subCluster mapping from the file. + */ + void load(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/package-info.java new file mode 100644 index 00000000000..c0426608a01 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.resolver; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java new file mode 100644 index 00000000000..7396942c6b1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java @@ -0,0 +1,184 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.federation.resolver; + +import java.net.URL; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test {@link SubClusterResolver} against correct and malformed Federation + * machine lists. + */ +public class TestDefaultSubClusterResolver { + private static YarnConfiguration conf; + private static SubClusterResolver resolver; + + public static void setUpGoodFile() { + conf = new YarnConfiguration(); + resolver = new DefaultSubClusterResolverImpl(); + + URL url = + Thread.currentThread().getContextClassLoader().getResource("nodes"); + if (url == null) { + throw new RuntimeException( + "Could not find 'nodes' dummy file in classpath"); + } + + conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath()); + resolver.setConf(conf); + resolver.load(); + } + + private void setUpMalformedFile() { + conf = new YarnConfiguration(); + resolver = new DefaultSubClusterResolverImpl(); + + URL url = Thread.currentThread().getContextClassLoader() + .getResource("nodes-malformed"); + if (url == null) { + throw new RuntimeException( + "Could not find 'nodes-malformed' dummy file in classpath"); + } + + conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath()); + resolver.setConf(conf); + resolver.load(); + } + + private void setUpNonExistentFile() { + conf = new YarnConfiguration(); + resolver = new DefaultSubClusterResolverImpl(); + + conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, "fileDoesNotExist"); + resolver.setConf(conf); + resolver.load(); + } + + @Test + public void testGetSubClusterForNode() throws YarnException { + setUpGoodFile(); + + // All lowercase, no whitespace in machine list file + Assert.assertEquals(SubClusterId.newInstance("subcluster1"), + resolver.getSubClusterForNode("node1")); + // Leading and trailing whitespace in machine list file + Assert.assertEquals(SubClusterId.newInstance("subcluster2"), + resolver.getSubClusterForNode("node2")); + // Node name capitalization in machine list file + Assert.assertEquals(SubClusterId.newInstance("subcluster3"), + resolver.getSubClusterForNode("node3")); + + try { + resolver.getSubClusterForNode("nodeDoesNotExist"); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Cannot find subClusterId for node")); + } + } + + @Test + public void testGetSubClusterForNodeMalformedFile() throws YarnException { + setUpMalformedFile(); + + try { + resolver.getSubClusterForNode("node1"); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Cannot find subClusterId for node")); + } + + try { + resolver.getSubClusterForNode("node2"); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Cannot find subClusterId for node")); + } + + Assert.assertEquals(SubClusterId.newInstance("subcluster3"), + resolver.getSubClusterForNode("node3")); + + try { + resolver.getSubClusterForNode("nodeDoesNotExist"); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Cannot find subClusterId for node")); + } + } + + @Test + public void testGetSubClusterForNodeNoFile() throws YarnException { + setUpNonExistentFile(); + + try { + resolver.getSubClusterForNode("node1"); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Cannot find subClusterId for node")); + } + } + + @Test + public void testGetSubClustersForRack() throws YarnException { + setUpGoodFile(); + + Set rack1Expected = new HashSet(); + rack1Expected.add(SubClusterId.newInstance("subcluster1")); + rack1Expected.add(SubClusterId.newInstance("subcluster2")); + + Set rack2Expected = new HashSet(); + rack2Expected.add(SubClusterId.newInstance("subcluster3")); + + // Two subclusters have nodes in rack1 + Assert.assertEquals(rack1Expected, resolver.getSubClustersForRack("rack1")); + + // Two nodes are in rack2, but both belong to subcluster3 + Assert.assertEquals(rack2Expected, resolver.getSubClustersForRack("rack2")); + + try { + resolver.getSubClustersForRack("rackDoesNotExist"); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().startsWith("Cannot resolve rack")); + } + } + + @Test + public void testGetSubClustersForRackNoFile() throws YarnException { + setUpNonExistentFile(); + + try { + resolver.getSubClustersForRack("rack1"); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().startsWith("Cannot resolve rack")); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes new file mode 100644 index 00000000000..e4d6112baa5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes @@ -0,0 +1,4 @@ +node1,subcluster1,rack1 + node2 , subcluster2, RACK1 +noDE3,subcluster3, rack2 +node4, subcluster3, rack2 \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes-malformed b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes-malformed new file mode 100644 index 00000000000..6d0aa39fc9c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes-malformed @@ -0,0 +1,3 @@ +node1, +node2,subcluster2,subCluster2, rack1 +node3,subcluster3, rack2 \ No newline at end of file