From 02f87683e3febeef28de028dfedc1e12ea39abad Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Sun, 16 Jun 2013 22:11:38 +0000 Subject: [PATCH] YARN-752. In AMRMClient, automatically add corresponding rack requests for requested nodes. (sandyr via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493599 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../apache/hadoop/yarn/client/AMRMClient.java | 39 ++++++--- .../hadoop/yarn/client/AMRMClientImpl.java | 80 ++++++++++++++----- .../hadoop/yarn/client/TestAMRMClient.java | 45 +++++++++++ .../TestAMRMClientContainerRequest.java | 80 +++++++++++++++++++ 5 files changed, 215 insertions(+), 32 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fb32fa3f2e3..ab055ae6274 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -372,6 +372,9 @@ Release 2.1.0-beta - UNRELEASED YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv) + YARN-752. In AMRMClient, automatically add corresponding rack requests for + requested nodes. (sandyr via tucu) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java index 5a0bc50ec49..36a5af4d3b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java @@ -42,23 +42,36 @@ import com.google.common.collect.ImmutableList; public interface AMRMClient extends Service { /** - * Object to represent container request for resources. - * Resources may be localized to nodes and racks. - * Resources may be assigned priorities. - * All getters return unmodifiable collections. - * Can ask for multiple containers of a given type. + * Object to represent container request for resources. Scheduler + * documentation should be consulted for the specifics of how the parameters + * are honored. + * All getters return immutable values. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The racks + * corresponding to any hosts requested will be automatically added to + * this list. + * @param priority + * The priority at which to request the containers. Higher priorities have + * lower numerical values. + * @param containerCount + * The number of containers to request. */ public static class ContainerRequest { final Resource capability; - final ImmutableList hosts; - final ImmutableList racks; + final List nodes; + final List racks; final Priority priority; final int containerCount; - public ContainerRequest(Resource capability, String[] hosts, + public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, int containerCount) { this.capability = capability; - this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null); + this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); this.priority = priority; this.containerCount = containerCount; @@ -68,8 +81,8 @@ public interface AMRMClient extends Servi return capability; } - public List getHosts() { - return hosts; + public List getNodes() { + return nodes; } public List getRacks() { @@ -103,9 +116,9 @@ public interface AMRMClient extends Servi * AMRMClient can remove it from its internal store. */ public static class StoredContainerRequest extends ContainerRequest { - public StoredContainerRequest(Resource capability, String[] hosts, + public StoredContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority) { - super(capability, hosts, racks, priority, 1); + super(capability, nodes, racks, priority, 1); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java index ac392d85b30..f76ea5c6e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.RackResolver; + +import com.google.common.base.Joiner; import com.google.common.annotations.VisibleForTesting; @@ -139,7 +143,7 @@ public class AMRMClientImpl //Key -> Priority //Value -> Map - //Key->ResourceName (e.g., hostname, rackname, *) + //Key->ResourceName (e.g., nodename, rackname, *) //Value->Map //Key->Resource Capability //Value->ResourceRequest @@ -160,6 +164,7 @@ public class AMRMClientImpl @Override protected void serviceInit(Configuration conf) throws Exception { + RackResolver.init(conf); super.serviceInit(conf); } @@ -309,20 +314,35 @@ public class AMRMClientImpl @Override public synchronized void addContainerRequest(T req) { - // Create resource requests - // add check for dup locations - if (req.hosts != null) { - for (String host : req.hosts) { - addResourceRequest(req.priority, host, req.capability, + Set allRacks = new HashSet(); + if (req.racks != null) { + allRacks.addAll(req.racks); + if(req.racks.size() != allRacks.size()) { + Joiner joiner = Joiner.on(','); + LOG.warn("ContainerRequest has duplicate racks: " + + joiner.join(req.racks)); + } + } + allRacks.addAll(resolveRacks(req.nodes)); + + if (req.nodes != null) { + HashSet dedupedNodes = new HashSet(req.nodes); + if(dedupedNodes.size() != req.nodes.size()) { + Joiner joiner = Joiner.on(','); + LOG.warn("ContainerRequest has duplicate nodes: " + + joiner.join(req.nodes)); + } + for (String node : dedupedNodes) { + // Ensure node requests are accompanied by requests for + // corresponding rack + addResourceRequest(req.priority, node, req.capability, req.containerCount, req); } } - if (req.racks != null) { - for (String rack : req.racks) { - addResourceRequest(req.priority, rack, req.capability, - req.containerCount, req); - } + for (String rack : allRacks) { + addResourceRequest(req.priority, rack, req.capability, + req.containerCount, req); } // Off-switch @@ -332,19 +352,23 @@ public class AMRMClientImpl @Override public synchronized void removeContainerRequest(T req) { + Set allRacks = new HashSet(); + if (req.racks != null) { + allRacks.addAll(req.racks); + } + allRacks.addAll(resolveRacks(req.nodes)); + // Update resource requests - if (req.hosts != null) { - for (String hostName : req.hosts) { - decResourceRequest(req.priority, hostName, req.capability, + if (req.nodes != null) { + for (String node : new HashSet(req.nodes)) { + decResourceRequest(req.priority, node, req.capability, req.containerCount, req); } } - if (req.racks != null) { - for (String rack : req.racks) { - decResourceRequest(req.priority, rack, req.capability, - req.containerCount, req); - } + for (String rack : allRacks) { + decResourceRequest(req.priority, rack, req.capability, + req.containerCount, req); } decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, @@ -404,6 +428,24 @@ public class AMRMClientImpl return list; } + private Set resolveRacks(List nodes) { + Set racks = new HashSet(); + if (nodes != null) { + for (String node : nodes) { + // Ensure node requests are accompanied by requests for + // corresponding rack + String rack = RackResolver.resolve(node).getNetworkLocation(); + if (rack == null) { + LOG.warn("Failed to resolve rack for node " + node + "."); + } else { + racks.add(rack); + } + } + } + + return racks; + } + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. // A ResourceRequest is removed from the remoteRequestTable. A 0 container diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java index 0db5eabfe63..4de458e39e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java @@ -267,6 +267,51 @@ public class TestAMRMClient { assertTrue(matches.size() == 1); assertTrue(matches.get(0).size() == matchSize); } + + @Test (timeout=60000) + public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException { + AMRMClientImpl amClient = null; + try { + // start am rm client + amClient = new AMRMClientImpl(attemptId); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + Resource capability = Resource.newInstance(1024, 2); + + StoredContainerRequest storedContainer1 = + new StoredContainerRequest(capability, nodes, null, priority); + amClient.addContainerRequest(storedContainer1); + + // verify matching with original node and inferred rack + List> matches; + StoredContainerRequest storedRequest; + // exact match node + matches = amClient.getMatchingRequests(priority, node, capability); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertTrue(storedContainer1 == storedRequest); + // inferred match rack + matches = amClient.getMatchingRequests(priority, rack, capability); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertTrue(storedContainer1 == storedRequest); + + // inferred rack match no longer valid after request is removed + amClient.removeContainerRequest(storedContainer1); + matches = amClient.getMatchingRequests(priority, rack, capability); + assertTrue(matches.isEmpty()); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } @Test (timeout=60000) public void testAMRMClientMatchStorage() throws YarnException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java new file mode 100644 index 00000000000..ab2de5bc93e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.junit.Test; + +import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest; +import static org.junit.Assert.assertEquals; + +public class TestAMRMClientContainerRequest { + @Test + public void testFillInRacks() { + AMRMClientImpl client = new AMRMClientImpl( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0)); + + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + client.init(conf); + + Resource capability = Resource.newInstance(1024, 1); + ContainerRequest request = + new ContainerRequest(capability, new String[] {"host1", "host2"}, + new String[] {"/rack2"}, Priority.newInstance(1), 4); + client.addContainerRequest(request); + verifyResourceRequestLocation(client, request, "host1"); + verifyResourceRequestLocation(client, request, "host2"); + verifyResourceRequestLocation(client, request, "/rack1"); + verifyResourceRequestLocation(client, request, "/rack2"); + verifyResourceRequestLocation(client, request, ResourceRequest.ANY); + } + + private static class MyResolver implements DNSToSwitchMapping { + + @Override + public List resolve(List names) { + return Arrays.asList("/rack1"); + } + + @Override + public void reloadCachedMappings() {} + } + + private void verifyResourceRequestLocation( + AMRMClientImpl client, ContainerRequest request, + String location) { + ResourceRequest ask = client.remoteRequestsTable.get(request.priority) + .get(location).get(request.capability).remoteRequest; + assertEquals(location, ask.getResourceName()); + assertEquals(request.getContainerCount(), ask.getNumContainers()); + } +}