YARN-5390. Federation Subcluster Resolver. Contributed by Ellen Hui.

This commit is contained in:
Subru Krishnan 2016-08-04 15:58:31 -07:00 committed by Carlo Curino
parent 20d1d2be91
commit d3dc461a93
10 changed files with 522 additions and 0 deletions

View File

@ -2555,6 +2555,14 @@ public class YarnConfiguration extends Configuration {
SHARED_CACHE_PREFIX + "nm.uploader.thread-count"; SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20; public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
////////////////////////////////
// Federation Configs
////////////////////////////////
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list";
//////////////////////////////// ////////////////////////////////
// Other Configs // Other Configs
//////////////////////////////// ////////////////////////////////

View File

@ -2688,6 +2688,13 @@
</property> </property>
<!-- Other Configuration --> <!-- Other Configuration -->
<property>
<description>
Machine list file to be loaded by the FederationSubCluster Resolver
</description>
<name>yarn.federation.machine-list</name>
</property>
<property> <property>
<description>The interval that the yarn client library uses to poll the <description>The interval that the yarn client library uses to poll the
completion status of the asynchronous API of application client protocol. completion status of the asynchronous API of application client protocol.

View File

@ -171,6 +171,16 @@
</filesets> </filesets>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/nodes</exclude>
<exclude>src/test/resources/nodes-malformed</exclude>
</excludes>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.resolver;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import java.util.HashMap;
import java.util.Set;
import java.util.Map;
/**
* Partial implementation of {@link SubClusterResolver}, containing basic
* implementations of the read methods.
*/
public abstract class AbstractSubClusterResolver implements SubClusterResolver {
private Map<String, SubClusterId> nodeToSubCluster =
new HashMap<String, SubClusterId>();
private Map<String, Set<SubClusterId>> rackToSubClusters =
new HashMap<String, Set<SubClusterId>>();
@Override
public SubClusterId getSubClusterForNode(String nodename)
throws YarnException {
SubClusterId subClusterId = this.nodeToSubCluster.get(nodename);
if (subClusterId == null) {
throw new YarnException("Cannot find subClusterId for node " + nodename);
}
return subClusterId;
}
@Override
public Set<SubClusterId> getSubClustersForRack(String rackname)
throws YarnException {
if (!rackToSubClusters.containsKey(rackname)) {
throw new YarnException("Cannot resolve rack " + rackname);
}
return rackToSubClusters.get(rackname);
}
protected Map<String, SubClusterId> getNodeToSubCluster() {
return nodeToSubCluster;
}
protected Map<String, Set<SubClusterId>> getRackToSubClusters() {
return rackToSubClusters;
}
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.resolver;
import java.io.BufferedReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Default simple sub-cluster and rack resolver class.
*
* This class expects a three-column comma separated file, specified in
* yarn.federation.machine-list. Each line of the file should be of the format:
*
* nodeName, subClusterId, rackName
*
* Lines that do not follow this format will be ignored. This resolver only
* loads the file when load() is explicitly called; it will not react to changes
* to the file.
*
* It is case-insensitive on the rack and node names and ignores
* leading/trailing whitespace.
*
*/
public class DefaultSubClusterResolverImpl extends AbstractSubClusterResolver
implements SubClusterResolver {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultSubClusterResolverImpl.class);
private Configuration conf;
// Index of the node hostname in the machine info file.
private static final int NODE_NAME_INDEX = 0;
// Index of the sub-cluster ID in the machine info file.
private static final int SUBCLUSTER_ID_INDEX = 1;
// Index of the rack name ID in the machine info file.
private static final int RACK_NAME_INDEX = 2;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public SubClusterId getSubClusterForNode(String nodename)
throws YarnException {
return super.getSubClusterForNode(nodename.toUpperCase());
}
@Override
public void load() {
String fileName =
this.conf.get(YarnConfiguration.FEDERATION_MACHINE_LIST, "");
try {
if (fileName == null || fileName.trim().length() == 0) {
LOG.info(
"The machine list file path is not specified in the configuration");
return;
}
Path file = null;
BufferedReader reader = null;
try {
file = Paths.get(fileName);
} catch (InvalidPathException e) {
LOG.info("The configured machine list file path {} does not exist",
fileName);
return;
}
try {
reader = Files.newBufferedReader(file, Charset.defaultCharset());
String line = null;
while ((line = reader.readLine()) != null) {
String[] tokens = line.split(",");
if (tokens.length == 3) {
String nodeName = tokens[NODE_NAME_INDEX].trim().toUpperCase();
SubClusterId subClusterId =
SubClusterId.newInstance(tokens[SUBCLUSTER_ID_INDEX].trim());
String rackName = tokens[RACK_NAME_INDEX].trim().toUpperCase();
if (LOG.isDebugEnabled()) {
LOG.debug("Loading node into resolver: {} --> {}", nodeName,
subClusterId);
LOG.debug("Loading rack into resolver: {} --> {} ", rackName,
subClusterId);
}
this.getNodeToSubCluster().put(nodeName, subClusterId);
loadRackToSubCluster(rackName, subClusterId);
} else {
LOG.warn("Skipping malformed line in machine list: " + line);
}
}
} finally {
if (reader != null) {
reader.close();
}
}
LOG.info("Successfully loaded file {}", fileName);
} catch (Exception e) {
LOG.error("Failed to parse file " + fileName, e);
}
}
private void loadRackToSubCluster(String rackName,
SubClusterId subClusterId) {
String rackNameUpper = rackName.toUpperCase();
if (!this.getRackToSubClusters().containsKey(rackNameUpper)) {
this.getRackToSubClusters().put(rackNameUpper,
new HashSet<SubClusterId>());
}
this.getRackToSubClusters().get(rackNameUpper).add(subClusterId);
}
@Override
public Set<SubClusterId> getSubClustersForRack(String rackname)
throws YarnException {
return super.getSubClustersForRack(rackname.toUpperCase());
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.resolver;
import java.util.Set;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* An utility that helps to determine the sub-cluster that a specified node
* belongs to.
*/
public interface SubClusterResolver extends Configurable {
/**
* Obtain the sub-cluster that a specified node belongs to.
*
* @param nodename the node whose sub-cluster is to be determined
* @return the sub-cluster as identified by the {@link SubClusterId} that the
* node belongs to
* @throws YarnException if the node's sub-cluster cannot be resolved
*/
SubClusterId getSubClusterForNode(String nodename) throws YarnException;
/**
* Obtain the sub-clusters that have nodes on a specified rack.
*
* @param rackname the name of the rack
* @return the sub-clusters as identified by the {@link SubClusterId} that
* have nodes on the given rack
* @throws YarnException if the sub-cluster of any node on the rack cannot be
* resolved, or if the rack name is not recognized
*/
Set<SubClusterId> getSubClustersForRack(String rackname) throws YarnException;
/**
* Load the nodes to subCluster mapping from the file.
*/
void load();
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.resolver;

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.resolver;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.junit.Assert;
import org.junit.Test;
/**
* Test {@link SubClusterResolver} against correct and malformed Federation
* machine lists.
*/
public class TestDefaultSubClusterResolver {
private static YarnConfiguration conf;
private static SubClusterResolver resolver;
public static void setUpGoodFile() {
conf = new YarnConfiguration();
resolver = new DefaultSubClusterResolverImpl();
URL url =
Thread.currentThread().getContextClassLoader().getResource("nodes");
if (url == null) {
throw new RuntimeException(
"Could not find 'nodes' dummy file in classpath");
}
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
resolver.setConf(conf);
resolver.load();
}
private void setUpMalformedFile() {
conf = new YarnConfiguration();
resolver = new DefaultSubClusterResolverImpl();
URL url = Thread.currentThread().getContextClassLoader()
.getResource("nodes-malformed");
if (url == null) {
throw new RuntimeException(
"Could not find 'nodes-malformed' dummy file in classpath");
}
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
resolver.setConf(conf);
resolver.load();
}
private void setUpNonExistentFile() {
conf = new YarnConfiguration();
resolver = new DefaultSubClusterResolverImpl();
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, "fileDoesNotExist");
resolver.setConf(conf);
resolver.load();
}
@Test
public void testGetSubClusterForNode() throws YarnException {
setUpGoodFile();
// All lowercase, no whitespace in machine list file
Assert.assertEquals(SubClusterId.newInstance("subcluster1"),
resolver.getSubClusterForNode("node1"));
// Leading and trailing whitespace in machine list file
Assert.assertEquals(SubClusterId.newInstance("subcluster2"),
resolver.getSubClusterForNode("node2"));
// Node name capitalization in machine list file
Assert.assertEquals(SubClusterId.newInstance("subcluster3"),
resolver.getSubClusterForNode("node3"));
try {
resolver.getSubClusterForNode("nodeDoesNotExist");
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Cannot find subClusterId for node"));
}
}
@Test
public void testGetSubClusterForNodeMalformedFile() throws YarnException {
setUpMalformedFile();
try {
resolver.getSubClusterForNode("node1");
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Cannot find subClusterId for node"));
}
try {
resolver.getSubClusterForNode("node2");
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Cannot find subClusterId for node"));
}
Assert.assertEquals(SubClusterId.newInstance("subcluster3"),
resolver.getSubClusterForNode("node3"));
try {
resolver.getSubClusterForNode("nodeDoesNotExist");
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Cannot find subClusterId for node"));
}
}
@Test
public void testGetSubClusterForNodeNoFile() throws YarnException {
setUpNonExistentFile();
try {
resolver.getSubClusterForNode("node1");
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Cannot find subClusterId for node"));
}
}
@Test
public void testGetSubClustersForRack() throws YarnException {
setUpGoodFile();
Set<SubClusterId> rack1Expected = new HashSet<SubClusterId>();
rack1Expected.add(SubClusterId.newInstance("subcluster1"));
rack1Expected.add(SubClusterId.newInstance("subcluster2"));
Set<SubClusterId> rack2Expected = new HashSet<SubClusterId>();
rack2Expected.add(SubClusterId.newInstance("subcluster3"));
// Two subclusters have nodes in rack1
Assert.assertEquals(rack1Expected, resolver.getSubClustersForRack("rack1"));
// Two nodes are in rack2, but both belong to subcluster3
Assert.assertEquals(rack2Expected, resolver.getSubClustersForRack("rack2"));
try {
resolver.getSubClustersForRack("rackDoesNotExist");
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith("Cannot resolve rack"));
}
}
@Test
public void testGetSubClustersForRackNoFile() throws YarnException {
setUpNonExistentFile();
try {
resolver.getSubClustersForRack("rack1");
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith("Cannot resolve rack"));
}
}
}

View File

@ -0,0 +1,4 @@
node1,subcluster1,rack1
node2 , subcluster2, RACK1
noDE3,subcluster3, rack2
node4, subcluster3, rack2

View File

@ -0,0 +1,3 @@
node1,
node2,subcluster2,subCluster2, rack1
node3,subcluster3, rack2