Merge r1503526 from trunk to branch-2 for YARN-521. Augment AM - RM client module to be able to request containers only at specific locations (Sandy Ryza via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503527 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-07-16 00:28:29 +00:00
parent 293d42edf0
commit 644b6a54ad
6 changed files with 371 additions and 41 deletions

View File

@ -459,6 +459,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-569. Add support for requesting and enforcing preemption requests via YARN-569. Add support for requesting and enforcing preemption requests via
a capacity monitor. (Carlo Curino, cdouglas) a capacity monitor. (Carlo Curino, cdouglas)
YARN-521. Augment AM - RM client module to be able to request containers
only at specific locations (Sandy Ryza via bikas)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -69,24 +69,32 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
} }
/** /**
* Object to represent container request for resources. Scheduler * Object to represent a container request for resources. Scheduler
* documentation should be consulted for the specifics of how the parameters * documentation should be consulted for the specifics of how the parameters
* are honored. * are honored.
* All getters return immutable values.
* *
* @param capability * By default, YARN schedulers try to allocate containers at the requested
* The {@link Resource} to be requested for each container. * locations but they may relax the constraints in order to expedite meeting
* @param nodes * allocations limits. They first relax the constraint to the same rack as the
* Any hosts to request that the containers are placed on. * requested node and then to anywhere in the cluster. The relaxLocality flag
* @param racks * may be used to disable locality relaxation and request containers at only
* Any racks to request that the containers are placed on. The racks * specific locations. The following conditions apply.
* corresponding to any hosts requested will be automatically added to * <ul>
* this list. * <li>Within a priority, all container requests must have the same value for
* @param priority * locality relaxation. Either enabled or disabled.</li>
* The priority at which to request the containers. Higher priorities have * <li>If locality relaxation is disabled, then across requests, locations at
* lower numerical values. * different network levels may not be specified. E.g. its invalid to make a
* @param containerCount * request for a specific node and another request for a specific rack.</li>
* The number of containers to request. * <li>If locality relaxation is disabled, then only within the same request,
* a node and its rack may be specified together. This allows for a specific
* rack with a preference for a specific node within that rack.</li>
* <li></li>
* </ul>
* To re-enable locality relaxation at a given priority, all pending requests
* with locality relaxation disabled must be first removed. Then they can be
* added back with locality relaxation enabled.
*
* All getters return immutable values.
*/ */
public static class ContainerRequest { public static class ContainerRequest {
final Resource capability; final Resource capability;
@ -94,9 +102,55 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
final List<String> racks; final List<String> racks;
final Priority priority; final Priority priority;
final int containerCount; final int containerCount;
final boolean relaxLocality;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
* locality relaxation enabled.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param containerCount
* The number of containers to request.
*/
public ContainerRequest(Resource capability, String[] nodes, public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount) { String[] racks, Priority priority, int containerCount) {
this(capability, nodes, racks, priority, containerCount, true);
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param containerCount
* The number of containers to request.
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
*/
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount,
boolean relaxLocality) {
// Validate request
Preconditions.checkArgument(capability != null, Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " + "The Resource to be requested for each container " +
"should not be null "); "should not be null ");
@ -104,11 +158,17 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
"The priority at which to request containers should not be null "); "The priority at which to request containers should not be null ");
Preconditions.checkArgument(containerCount > 0, Preconditions.checkArgument(containerCount > 0,
"The number of containers to request should larger than 0"); "The number of containers to request should larger than 0");
Preconditions.checkArgument(
(!relaxLocality && (racks == null || racks.length == 0)
&& (nodes == null || nodes.length == 0)),
"Can't turn off locality relaxation on a " +
"request with no location constraints");
this.capability = capability; this.capability = capability;
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority; this.priority = priority;
this.containerCount = containerCount; this.containerCount = containerCount;
this.relaxLocality = relaxLocality;
} }
public Resource getCapability() { public Resource getCapability() {
@ -131,6 +191,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return containerCount; return containerCount;
} }
public boolean getRelaxLocality() {
return relaxLocality;
}
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]"); sb.append("Capability[").append(capability).append("]");
@ -154,6 +218,11 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
String[] racks, Priority priority) { String[] racks, Priority priority) {
super(capability, nodes, racks, priority, 1); super(capability, nodes, racks, priority, 1);
} }
public StoredContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, boolean relaxLocality) {
super(capability, nodes, racks, priority, 1, relaxLocality);
}
} }
/** /**

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
/**
* Thrown when an arguments are combined to construct a
* <code>AMRMClient.ContainerRequest</code> in an invalid way.
*/
public class InvalidContainerRequestException extends YarnRuntimeException {
public InvalidContainerRequestException(Throwable cause) {
super(cause);
}
public InvalidContainerRequestException(String message) {
super(message);
}
public InvalidContainerRequestException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -75,6 +77,8 @@ import com.google.common.base.Preconditions;
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
private static final List<String> ANY_LIST =
Collections.singletonList(ResourceRequest.ANY);
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
@ -91,9 +95,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
LinkedHashSet<T> containerRequests; LinkedHashSet<T> containerRequests;
ResourceRequestInfo(Priority priority, String resourceName, ResourceRequestInfo(Priority priority, String resourceName,
Resource capability) { Resource capability, boolean relaxLocality) {
remoteRequest = ResourceRequest.newInstance(priority, resourceName, remoteRequest = ResourceRequest.newInstance(priority, resourceName,
capability, 0); capability, 0);
remoteRequest.setRelaxLocality(relaxLocality);
containerRequests = new LinkedHashSet<T>(); containerRequests = new LinkedHashSet<T>();
} }
} }
@ -226,7 +231,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
@Override @Override
public AllocateResponse allocate(float progressIndicator) public AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException { throws YarnException, IOException {
Preconditions.checkArgument(progressIndicator > 0, Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative"); "Progress indicator should not be negative");
AllocateResponse allocateResponse = null; AllocateResponse allocateResponse = null;
ArrayList<ResourceRequest> askList = null; ArrayList<ResourceRequest> askList = null;
@ -326,17 +331,30 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
public synchronized void addContainerRequest(T req) { public synchronized void addContainerRequest(T req) {
Preconditions.checkArgument(req != null, Preconditions.checkArgument(req != null,
"Resource request can not be null."); "Resource request can not be null.");
Set<String> allRacks = new HashSet<String>(); Set<String> dedupedRacks = new HashSet<String>();
if (req.getRacks() != null) { if (req.getRacks() != null) {
allRacks.addAll(req.getRacks()); dedupedRacks.addAll(req.getRacks());
if(req.getRacks().size() != allRacks.size()) { if(req.getRacks().size() != dedupedRacks.size()) {
Joiner joiner = Joiner.on(','); Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate racks: " LOG.warn("ContainerRequest has duplicate racks: "
+ joiner.join(req.getRacks())); + joiner.join(req.getRacks()));
} }
} }
allRacks.addAll(resolveRacks(req.getNodes())); Set<String> inferredRacks = resolveRacks(req.getNodes());
inferredRacks.removeAll(dedupedRacks);
// check that specific and non-specific requests cannot be mixed within a
// priority
checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
req.getRelaxLocality());
// check that specific rack cannot be mixed with specific node within a
// priority. If node and its rack are both specified then they must be
// in the same request.
// For explicitly requested racks, we set locality relaxation to true
checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
req.getRelaxLocality());
if (req.getNodes() != null) { if (req.getNodes() != null) {
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes()); HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
if(dedupedNodes.size() != req.getNodes().size()) { if(dedupedNodes.size() != req.getNodes().size()) {
@ -345,21 +363,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+ joiner.join(req.getNodes())); + joiner.join(req.getNodes()));
} }
for (String node : dedupedNodes) { for (String node : dedupedNodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
addResourceRequest(req.getPriority(), node, req.getCapability(), addResourceRequest(req.getPriority(), node, req.getCapability(),
req.getContainerCount(), req); req.getContainerCount(), req, true);
} }
} }
for (String rack : allRacks) { for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(), addResourceRequest(req.getPriority(), rack, req.getCapability(),
req.getContainerCount(), req); req.getContainerCount(), req, true);
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(),
req.getContainerCount(), req, req.getRelaxLocality());
} }
// Off-switch // Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(), addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
req.getContainerCount(), req); req.getContainerCount(), req, req.getRelaxLocality());
} }
@Override @Override
@ -428,7 +451,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
} }
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo != null) { if (resourceRequestInfo != null &&
!resourceRequestInfo.containerRequests.isEmpty()) {
list.add(resourceRequestInfo.containerRequests); list.add(resourceRequestInfo.containerRequests);
return list; return list;
} }
@ -438,7 +462,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
SortedMap<Resource, ResourceRequestInfo> tailMap = SortedMap<Resource, ResourceRequestInfo> tailMap =
reqMap.tailMap(capability); reqMap.tailMap(capability);
for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) { for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
if(canFit(entry.getKey(), capability)) { if (canFit(entry.getKey(), capability) &&
!entry.getValue().containerRequests.isEmpty()) {
// match found that fits in the larger resource // match found that fits in the larger resource
list.add(entry.getValue().containerRequests); list.add(entry.getValue().containerRequests);
} }
@ -466,6 +491,33 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return racks; return racks;
} }
/**
* ContainerRequests with locality relaxation cannot be made at the same
* priority as ContainerRequests without locality relaxation.
*/
private void checkLocalityRelaxationConflict(Priority priority,
Collection<String> locations, boolean relaxLocality) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
return;
}
// Locality relaxation will be set to relaxLocality for all implicitly
// requested racks. Make sure that existing rack requests match this.
for (String location : locations) {
TreeMap<Resource, ResourceRequestInfo> reqs =
remoteRequests.get(location);
if (reqs != null && !reqs.isEmpty()
&& reqs.values().iterator().next().remoteRequest.getRelaxLocality()
!= relaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location " + location
+ " with locality relaxation " + relaxLocality + " when it has "
+ "already been requested with locality relaxation " + relaxLocality);
}
}
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) { private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario. // This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container // A ResourceRequest is removed from the remoteRequestTable. A 0 container
@ -484,7 +536,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
} }
private void addResourceRequest(Priority priority, String resourceName, private void addResourceRequest(Priority priority, String resourceName,
Resource capability, int containerCount, T req) { Resource capability, int containerCount, T req, boolean relaxLocality) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority); this.remoteRequestsTable.get(priority);
if (remoteRequests == null) { if (remoteRequests == null) {
@ -506,14 +558,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo == null) { if (resourceRequestInfo == null) {
resourceRequestInfo = resourceRequestInfo =
new ResourceRequestInfo(priority, resourceName, capability); new ResourceRequestInfo(priority, resourceName, capability,
relaxLocality);
reqMap.put(capability, resourceRequestInfo); reqMap.put(capability, resourceRequestInfo);
} }
resourceRequestInfo.remoteRequest.setNumContainers( resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() + containerCount); resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
if(req instanceof StoredContainerRequest) { if (req instanceof StoredContainerRequest && relaxLocality) {
resourceRequestInfo.containerRequests.add(req); resourceRequestInfo.containerRequests.add(req);
} }

View File

@ -83,6 +83,7 @@ public class TestAMRMClient {
static Resource capability; static Resource capability;
static Priority priority; static Priority priority;
static Priority priority2;
static String node; static String node;
static String rack; static String rack;
static String[] nodes; static String[] nodes;
@ -105,6 +106,7 @@ public class TestAMRMClient {
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
priority = Priority.newInstance(1); priority = Priority.newInstance(1);
priority2 = Priority.newInstance(2);
capability = Resource.newInstance(1024, 1); capability = Resource.newInstance(1024, 1);
node = nodeReports.get(0).getNodeId().getHost(); node = nodeReports.get(0).getNodeId().getHost();
@ -181,6 +183,7 @@ public class TestAMRMClient {
Resource capability4 = Resource.newInstance(2000, 1); Resource capability4 = Resource.newInstance(2000, 1);
Resource capability5 = Resource.newInstance(1000, 3); Resource capability5 = Resource.newInstance(1000, 3);
Resource capability6 = Resource.newInstance(2000, 1); Resource capability6 = Resource.newInstance(2000, 1);
Resource capability7 = Resource.newInstance(2000, 1);
StoredContainerRequest storedContainer1 = StoredContainerRequest storedContainer1 =
new StoredContainerRequest(capability1, nodes, racks, priority); new StoredContainerRequest(capability1, nodes, racks, priority);
@ -194,12 +197,15 @@ public class TestAMRMClient {
new StoredContainerRequest(capability5, nodes, racks, priority); new StoredContainerRequest(capability5, nodes, racks, priority);
StoredContainerRequest storedContainer6 = StoredContainerRequest storedContainer6 =
new StoredContainerRequest(capability6, nodes, racks, priority); new StoredContainerRequest(capability6, nodes, racks, priority);
StoredContainerRequest storedContainer7 =
new StoredContainerRequest(capability7, nodes, racks, priority2, false);
amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2); amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3); amClient.addContainerRequest(storedContainer3);
amClient.addContainerRequest(storedContainer4); amClient.addContainerRequest(storedContainer4);
amClient.addContainerRequest(storedContainer5); amClient.addContainerRequest(storedContainer5);
amClient.addContainerRequest(storedContainer6); amClient.addContainerRequest(storedContainer6);
amClient.addContainerRequest(storedContainer7);
// test matching of containers // test matching of containers
List<? extends Collection<StoredContainerRequest>> matches; List<? extends Collection<StoredContainerRequest>> matches;
@ -249,6 +255,15 @@ public class TestAMRMClient {
matches = amClient.getMatchingRequests(priority, node, testCapability5); matches = amClient.getMatchingRequests(priority, node, testCapability5);
assert(matches.size() == 0); assert(matches.size() == 0);
// verify requests without relaxed locality are only returned at specific
// locations
Resource testCapability7 = Resource.newInstance(2000, 1);
matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY,
testCapability7);
assert(matches.size() == 0);
matches = amClient.getMatchingRequests(priority2, node, testCapability7);
assert(matches.size() == 1);
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null); null, null);

View File

@ -24,12 +24,15 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.junit.Test; import org.junit.Test;
@ -52,11 +55,159 @@ public class TestAMRMClientContainerRequest {
new ContainerRequest(capability, new String[] {"host1", "host2"}, new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1), 4); new String[] {"/rack2"}, Priority.newInstance(1), 4);
client.addContainerRequest(request); client.addContainerRequest(request);
verifyResourceRequestLocation(client, request, "host1"); verifyResourceRequest(client, request, "host1", true);
verifyResourceRequestLocation(client, request, "host2"); verifyResourceRequest(client, request, "host2", true);
verifyResourceRequestLocation(client, request, "/rack1"); verifyResourceRequest(client, request, "/rack1", true);
verifyResourceRequestLocation(client, request, "/rack2"); verifyResourceRequest(client, request, "/rack2", true);
verifyResourceRequestLocation(client, request, ResourceRequest.ANY); verifyResourceRequest(client, request, ResourceRequest.ANY, true);
}
@Test
public void testDisableLocalityRelaxation() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1);
ContainerRequest nodeLevelRequest =
new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false);
client.addContainerRequest(nodeLevelRequest);
verifyResourceRequest(client, nodeLevelRequest, ResourceRequest.ANY, false);
verifyResourceRequest(client, nodeLevelRequest, "/rack1", false);
verifyResourceRequest(client, nodeLevelRequest, "host1", true);
verifyResourceRequest(client, nodeLevelRequest, "host2", true);
// Make sure we don't get any errors with two node-level requests at the
// same priority
ContainerRequest nodeLevelRequest2 =
new ContainerRequest(capability, new String[] {"host2", "host3"},
null, Priority.newInstance(1), 4, false);
client.addContainerRequest(nodeLevelRequest2);
AMRMClient.ContainerRequest rackLevelRequest =
new AMRMClient.ContainerRequest(capability, null,
new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), 3, false);
client.addContainerRequest(rackLevelRequest);
verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY, false);
verifyResourceRequest(client, rackLevelRequest, "/rack3", true);
verifyResourceRequest(client, rackLevelRequest, "/rack4", true);
// Make sure we don't get any errors with two rack-level requests at the
// same priority
AMRMClient.ContainerRequest rackLevelRequest2 =
new AMRMClient.ContainerRequest(capability, null,
new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), 3, false);
client.addContainerRequest(rackLevelRequest2);
ContainerRequest bothLevelRequest =
new ContainerRequest(capability, new String[] {"host3", "host4"},
new String[] {"rack1", "/otherrack"},
Priority.newInstance(3), 4, false);
client.addContainerRequest(bothLevelRequest);
verifyResourceRequest(client, bothLevelRequest, ResourceRequest.ANY, false);
verifyResourceRequest(client, bothLevelRequest, "rack1",
true);
verifyResourceRequest(client, bothLevelRequest, "/otherrack",
true);
verifyResourceRequest(client, bothLevelRequest, "host3", true);
verifyResourceRequest(client, bothLevelRequest, "host4", true);
// Make sure we don't get any errors with two both-level requests at the
// same priority
ContainerRequest bothLevelRequest2 =
new ContainerRequest(capability, new String[] {"host4", "host5"},
new String[] {"rack1", "/otherrack2"},
Priority.newInstance(3), 4, false);
client.addContainerRequest(bothLevelRequest2);
}
@Test (expected = InvalidContainerRequestException.class)
public void testDifferentLocalityRelaxationSamePriority() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false);
client.addContainerRequest(request1);
ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host3"},
null, Priority.newInstance(1), 4, true);
client.addContainerRequest(request2);
}
@Test
public void testInvalidValidWhenOldRemoved() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false);
client.addContainerRequest(request1);
client.removeContainerRequest(request1);
ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host3"},
null, Priority.newInstance(1), 4, true);
client.addContainerRequest(request2);
client.removeContainerRequest(request2);
ContainerRequest request3 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false);
client.addContainerRequest(request3);
client.removeContainerRequest(request3);
ContainerRequest request4 =
new ContainerRequest(capability, null,
new String[] {"rack1"}, Priority.newInstance(1), 4, true);
client.addContainerRequest(request4);
}
@Test (expected = InvalidContainerRequestException.class)
public void testLocalityRelaxationDifferentLevels() {
AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false);
client.addContainerRequest(request1);
ContainerRequest request2 =
new ContainerRequest(capability, null,
new String[] {"rack1"}, Priority.newInstance(1), 4, true);
client.addContainerRequest(request2);
} }
private static class MyResolver implements DNSToSwitchMapping { private static class MyResolver implements DNSToSwitchMapping {
@ -70,12 +221,13 @@ public class TestAMRMClientContainerRequest {
public void reloadCachedMappings() {} public void reloadCachedMappings() {}
} }
private void verifyResourceRequestLocation( private void verifyResourceRequest(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request, AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location) { String location, boolean expectedRelaxLocality) {
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority()) ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
.get(location).get(request.getCapability()).remoteRequest; .get(location).get(request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName()); assertEquals(location, ask.getResourceName());
assertEquals(request.getContainerCount(), ask.getNumContainers()); assertEquals(request.getContainerCount(), ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
} }
} }