YARN-752. In AMRMClient, automatically add corresponding rack requests for requested nodes. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1493601 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-06-16 22:18:30 +00:00
parent 19bf81198e
commit 1d116f7178
5 changed files with 215 additions and 32 deletions

View File

@ -352,6 +352,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
YARN-752. In AMRMClient, automatically add corresponding rack requests for
requested nodes. (sandyr via tucu)
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -42,23 +42,36 @@ import com.google.common.collect.ImmutableList;
public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
/**
* Object to represent container request for resources.
* Resources may be localized to nodes and racks.
* Resources may be assigned priorities.
* All getters return unmodifiable collections.
* Can ask for multiple containers of a given type.
* Object to represent container request for resources. Scheduler
* documentation should be consulted for the specifics of how the parameters
* are honored.
* All getters return immutable values.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The racks
* corresponding to any hosts requested will be automatically added to
* this list.
* @param priority
* The priority at which to request the containers. Higher priorities have
* lower numerical values.
* @param containerCount
* The number of containers to request.
*/
public static class ContainerRequest {
final Resource capability;
final ImmutableList<String> hosts;
final ImmutableList<String> racks;
final List<String> nodes;
final List<String> racks;
final Priority priority;
final int containerCount;
public ContainerRequest(Resource capability, String[] hosts,
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount) {
this.capability = capability;
this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority;
this.containerCount = containerCount;
@ -68,8 +81,8 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
return capability;
}
public List<String> getHosts() {
return hosts;
public List<String> getNodes() {
return nodes;
}
public List<String> getRacks() {
@ -103,9 +116,9 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* AMRMClient can remove it from its internal store.
*/
public static class StoredContainerRequest extends ContainerRequest {
public StoredContainerRequest(Resource capability, String[] hosts,
public StoredContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority) {
super(capability, hosts, racks, priority, 1);
super(capability, nodes, racks, priority, 1);
}
}

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.base.Joiner;
import com.google.common.annotations.VisibleForTesting;
@ -139,7 +143,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
//Key -> Priority
//Value -> Map
//Key->ResourceName (e.g., hostname, rackname, *)
//Key->ResourceName (e.g., nodename, rackname, *)
//Value->Map
//Key->Resource Capability
//Value->ResourceRequest
@ -160,6 +164,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
@Override
protected void serviceInit(Configuration conf) throws Exception {
RackResolver.init(conf);
super.serviceInit(conf);
}
@ -309,21 +314,36 @@ public class AMRMClientImpl<T extends ContainerRequest>
@Override
public synchronized void addContainerRequest(T req) {
// Create resource requests
// add check for dup locations
if (req.hosts != null) {
for (String host : req.hosts) {
addResourceRequest(req.priority, host, req.capability,
Set<String> allRacks = new HashSet<String>();
if (req.racks != null) {
allRacks.addAll(req.racks);
if(req.racks.size() != allRacks.size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate racks: "
+ joiner.join(req.racks));
}
}
allRacks.addAll(resolveRacks(req.nodes));
if (req.nodes != null) {
HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
if(dedupedNodes.size() != req.nodes.size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate nodes: "
+ joiner.join(req.nodes));
}
for (String node : dedupedNodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
addResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
}
}
if (req.racks != null) {
for (String rack : req.racks) {
for (String rack : allRacks) {
addResourceRequest(req.priority, rack, req.capability,
req.containerCount, req);
}
}
// Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
@ -332,20 +352,24 @@ public class AMRMClientImpl<T extends ContainerRequest>
@Override
public synchronized void removeContainerRequest(T req) {
Set<String> allRacks = new HashSet<String>();
if (req.racks != null) {
allRacks.addAll(req.racks);
}
allRacks.addAll(resolveRacks(req.nodes));
// Update resource requests
if (req.hosts != null) {
for (String hostName : req.hosts) {
decResourceRequest(req.priority, hostName, req.capability,
if (req.nodes != null) {
for (String node : new HashSet<String>(req.nodes)) {
decResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
}
}
if (req.racks != null) {
for (String rack : req.racks) {
for (String rack : allRacks) {
decResourceRequest(req.priority, rack, req.capability,
req.containerCount, req);
}
}
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount, req);
@ -404,6 +428,24 @@ public class AMRMClientImpl<T extends ContainerRequest>
return list;
}
private Set<String> resolveRacks(List<String> nodes) {
Set<String> racks = new HashSet<String>();
if (nodes != null) {
for (String node : nodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
String rack = RackResolver.resolve(node).getNetworkLocation();
if (rack == null) {
LOG.warn("Failed to resolve rack for node " + node + ".");
} else {
racks.add(rack);
}
}
}
return racks;
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container

View File

@ -268,6 +268,51 @@ public class TestAMRMClient {
assertTrue(matches.get(0).size() == matchSize);
}
@Test (timeout=60000)
public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
AMRMClientImpl<StoredContainerRequest> amClient = null;
try {
// start am rm client
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
Resource capability = Resource.newInstance(1024, 2);
StoredContainerRequest storedContainer1 =
new StoredContainerRequest(capability, nodes, null, priority);
amClient.addContainerRequest(storedContainer1);
// verify matching with original node and inferred rack
List<? extends Collection<StoredContainerRequest>> matches;
StoredContainerRequest storedRequest;
// exact match node
matches = amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
// inferred match rack
matches = amClient.getMatchingRequests(priority, rack, capability);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
// inferred rack match no longer valid after request is removed
amClient.removeContainerRequest(storedContainer1);
matches = amClient.getMatchingRequests(priority, rack, capability);
assertTrue(matches.isEmpty());
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
@Test (timeout=60000)
public void testAMRMClientMatchStorage() throws YarnException, IOException {
AMRMClientImpl<StoredContainerRequest> amClient = null;

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.junit.Test;
import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest;
import static org.junit.Assert.assertEquals;
public class TestAMRMClientContainerRequest {
@Test
public void testFillInRacks() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request =
new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1), 4);
client.addContainerRequest(request);
verifyResourceRequestLocation(client, request, "host1");
verifyResourceRequestLocation(client, request, "host2");
verifyResourceRequestLocation(client, request, "/rack1");
verifyResourceRequestLocation(client, request, "/rack2");
verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
}
private static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return Arrays.asList("/rack1");
}
@Override
public void reloadCachedMappings() {}
}
private void verifyResourceRequestLocation(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location) {
ResourceRequest ask = client.remoteRequestsTable.get(request.priority)
.get(location).get(request.capability).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(request.getContainerCount(), ask.getNumContainers());
}
}