Merge r1503960 from trunk to branch-2 for YARN-927. Change ContainerRequest to not have more than 1 container count and remove StoreContainerRequest (bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503965 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-07-17 01:01:03 +00:00
parent a55ae9e988
commit ace3a40b23
7 changed files with 121 additions and 146 deletions

View File

@ -474,6 +474,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-513. Create common proxy client for communicating with RM. (Xuan Gong YARN-513. Create common proxy client for communicating with RM. (Xuan Gong
& Jian He via bikas) & Jian He via bikas)
YARN-927. Change ContainerRequest to not have more than 1 container count
and remove StoreContainerRequest (bikas)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -483,8 +483,10 @@ public class ApplicationMaster {
// containers // containers
// Keep looping until all the containers are launched and shell script // Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure). // executed on them ( regardless of success/failure).
ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers); for (int i = 0; i < numTotalContainers; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
resourceManager.addContainerRequest(containerAsk); resourceManager.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainers); numRequestedContainers.set(numTotalContainers);
while (!done) { while (!done) {
@ -591,9 +593,11 @@ public class ApplicationMaster {
numRequestedContainers.addAndGet(askCount); numRequestedContainers.addAndGet(askCount);
if (askCount > 0) { if (askCount > 0) {
ContainerRequest containerAsk = setupContainerAskForRM(askCount); for (int i = 0; i < askCount; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
resourceManager.addContainerRequest(containerAsk); resourceManager.addContainerRequest(containerAsk);
} }
}
if (numCompletedContainers.get() == numTotalContainers) { if (numCompletedContainers.get() == numTotalContainers) {
done = true; done = true;
@ -813,7 +817,7 @@ public class ApplicationMaster {
* @param numContainers Containers to ask for from RM * @param numContainers Containers to ask for from RM
* @return the setup ResourceRequest to be sent to RM * @return the setup ResourceRequest to be sent to RM
*/ */
private ContainerRequest setupContainerAskForRM(int numContainers) { private ContainerRequest setupContainerAskForRM() {
// setup requirements for hosts // setup requirements for hosts
// using * as any host will do for the distributed shell app // using * as any host will do for the distributed shell app
// set the priority for the request // set the priority for the request
@ -827,7 +831,7 @@ public class ApplicationMaster {
capability.setMemory(containerMemory); capability.setMemory(containerMemory);
ContainerRequest request = new ContainerRequest(capability, null, null, ContainerRequest request = new ContainerRequest(capability, null, null,
pri, numContainers); pri);
LOG.info("Requested container ask: " + request.toString()); LOG.info("Requested container ask: " + request.toString());
return request; return request;
} }

View File

@ -69,7 +69,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
} }
/** /**
* Object to represent a container request for resources. Scheduler * Object to represent a single container request for resources. Scheduler
* documentation should be consulted for the specifics of how the parameters * documentation should be consulted for the specifics of how the parameters
* are honored. * are honored.
* *
@ -101,7 +101,6 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
final List<String> nodes; final List<String> nodes;
final List<String> racks; final List<String> racks;
final Priority priority; final Priority priority;
final int containerCount;
final boolean relaxLocality; final boolean relaxLocality;
/** /**
@ -119,12 +118,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* @param priority * @param priority
* The priority at which to request the containers. Higher * The priority at which to request the containers. Higher
* priorities have lower numerical values. * priorities have lower numerical values.
* @param containerCount
* The number of containers to request.
*/ */
public ContainerRequest(Resource capability, String[] nodes, public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount) { String[] racks, Priority priority) {
this(capability, nodes, racks, priority, containerCount, true); this(capability, nodes, racks, priority, true);
} }
/** /**
@ -141,23 +138,18 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* @param priority * @param priority
* The priority at which to request the containers. Higher * The priority at which to request the containers. Higher
* priorities have lower numerical values. * priorities have lower numerical values.
* @param containerCount
* The number of containers to request.
* @param relaxLocality * @param relaxLocality
* If true, containers for this request may be assigned on hosts * If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested. * and racks other than the ones explicitly requested.
*/ */
public ContainerRequest(Resource capability, String[] nodes, public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount, String[] racks, Priority priority, boolean relaxLocality) {
boolean relaxLocality) {
// Validate request // Validate request
Preconditions.checkArgument(capability != null, Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " + "The Resource to be requested for each container " +
"should not be null "); "should not be null ");
Preconditions.checkArgument(priority != null, Preconditions.checkArgument(priority != null,
"The priority at which to request containers should not be null "); "The priority at which to request containers should not be null ");
Preconditions.checkArgument(containerCount > 0,
"The number of containers to request should larger than 0");
Preconditions.checkArgument( Preconditions.checkArgument(
!(!relaxLocality && (racks == null || racks.length == 0) !(!relaxLocality && (racks == null || racks.length == 0)
&& (nodes == null || nodes.length == 0)), && (nodes == null || nodes.length == 0)),
@ -167,7 +159,6 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority; this.priority = priority;
this.containerCount = containerCount;
this.relaxLocality = relaxLocality; this.relaxLocality = relaxLocality;
} }
@ -187,10 +178,6 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return priority; return priority;
} }
public int getContainerCount() {
return containerCount;
}
public boolean getRelaxLocality() { public boolean getRelaxLocality() {
return relaxLocality; return relaxLocality;
} }
@ -199,32 +186,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]"); sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]"); sb.append("Priority[").append(priority).append("]");
sb.append("ContainerCount[").append(containerCount).append("]");
return sb.toString(); return sb.toString();
} }
} }
/**
* This creates a <code>ContainerRequest</code> for 1 container and the
* AMRMClient stores this request internally. <code>getMatchingRequests</code>
* can be used to retrieve these requests from AMRMClient. These requests may
* be matched with an allocated container to determine which request to assign
* the container to. <code>removeContainerRequest</code> must be called using
* the same assigned <code>StoredContainerRequest</code> object so that
* AMRMClient can remove it from its internal store.
*/
public static class StoredContainerRequest extends ContainerRequest {
public StoredContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority) {
super(capability, nodes, racks, priority, 1);
}
public StoredContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, boolean relaxLocality) {
super(capability, nodes, racks, priority, 1, relaxLocality);
}
}
/** /**
* Register the application master. This must be called before any * Register the application master. This must be called before any
* other interaction * other interaction
@ -311,8 +276,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
public abstract int getClusterNodeCount(); public abstract int getClusterNodeCount();
/** /**
* Get outstanding <code>StoredContainerRequest</code>s matching the given * Get outstanding <code>ContainerRequest</code>s matching the given
* parameters. These StoredContainerRequests should have been added via * parameters. These ContainerRequests should have been added via
* <code>addContainerRequest</code> earlier in the lifecycle. For performance, * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
* the AMRMClient may return its internal collection directly without creating * the AMRMClient may return its internal collection directly without creating
* a copy. Users should not perform mutable operations on the return value. * a copy. Users should not perform mutable operations on the return value.

View File

@ -343,26 +343,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+ joiner.join(req.getNodes())); + joiner.join(req.getNodes()));
} }
for (String node : dedupedNodes) { for (String node : dedupedNodes) {
addResourceRequest(req.getPriority(), node, req.getCapability(), addResourceRequest(req.getPriority(), node, req.getCapability(), req,
req.getContainerCount(), req, true); true);
} }
} }
for (String rack : dedupedRacks) { for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(), addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
req.getContainerCount(), req, true); true);
} }
// Ensure node requests are accompanied by requests for // Ensure node requests are accompanied by requests for
// corresponding rack // corresponding rack
for (String rack : inferredRacks) { for (String rack : inferredRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(), addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
req.getContainerCount(), req, req.getRelaxLocality()); req.getRelaxLocality());
} }
// Off-switch // Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(), addResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getContainerCount(), req, req.getRelaxLocality()); req.getCapability(), req, req.getRelaxLocality());
} }
@Override @Override
@ -378,18 +378,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// Update resource requests // Update resource requests
if (req.getNodes() != null) { if (req.getNodes() != null) {
for (String node : new HashSet<String>(req.getNodes())) { for (String node : new HashSet<String>(req.getNodes())) {
decResourceRequest(req.getPriority(), node, req.getCapability(), decResourceRequest(req.getPriority(), node, req.getCapability(), req);
req.getContainerCount(), req);
} }
} }
for (String rack : allRacks) { for (String rack : allRacks) {
decResourceRequest(req.getPriority(), rack, req.getCapability(), decResourceRequest(req.getPriority(), rack, req.getCapability(), req);
req.getContainerCount(), req);
} }
decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(), decResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getContainerCount(), req); req.getCapability(), req);
} }
@Override @Override
@ -516,7 +514,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
} }
private void addResourceRequest(Priority priority, String resourceName, private void addResourceRequest(Priority priority, String resourceName,
Resource capability, int containerCount, T req, boolean relaxLocality) { Resource capability, T req, boolean relaxLocality) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority); this.remoteRequestsTable.get(priority);
if (remoteRequests == null) { if (remoteRequests == null) {
@ -544,9 +542,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
} }
resourceRequestInfo.remoteRequest.setNumContainers( resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() + containerCount); resourceRequestInfo.remoteRequest.getNumContainers() + 1);
if (req instanceof StoredContainerRequest && relaxLocality) { if (relaxLocality) {
resourceRequestInfo.containerRequests.add(req); resourceRequestInfo.containerRequests.add(req);
} }
@ -565,7 +563,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private void decResourceRequest(Priority priority, private void decResourceRequest(Priority priority,
String resourceName, String resourceName,
Resource capability, Resource capability,
int containerCount,
T req) { T req) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority); this.remoteRequestsTable.get(priority);
@ -597,11 +594,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
} }
resourceRequestInfo.remoteRequest.setNumContainers( resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() - containerCount); resourceRequestInfo.remoteRequest.getNumContainers() - 1);
if(req instanceof StoredContainerRequest) {
resourceRequestInfo.containerRequests.remove(req); resourceRequestInfo.containerRequests.remove(req);
}
if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) { if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
// guard against spurious removals // guard against spurious removals

View File

@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
@ -93,6 +92,7 @@ public class TestAMRMClient {
public static void setup() throws Exception { public static void setup() throws Exception {
// start minicluster // start minicluster
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();
@ -169,10 +169,10 @@ public class TestAMRMClient {
@Test (timeout=60000) @Test (timeout=60000)
public void testAMRMClientMatchingFit() throws YarnException, IOException { public void testAMRMClientMatchingFit() throws YarnException, IOException {
AMRMClient<StoredContainerRequest> amClient = null; AMRMClient<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = AMRMClient.<StoredContainerRequest>createAMRMClient(attemptId); amClient = AMRMClient.<ContainerRequest>createAMRMClient(attemptId);
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
@ -185,20 +185,20 @@ public class TestAMRMClient {
Resource capability6 = Resource.newInstance(2000, 1); Resource capability6 = Resource.newInstance(2000, 1);
Resource capability7 = Resource.newInstance(2000, 1); Resource capability7 = Resource.newInstance(2000, 1);
StoredContainerRequest storedContainer1 = ContainerRequest storedContainer1 =
new StoredContainerRequest(capability1, nodes, racks, priority); new ContainerRequest(capability1, nodes, racks, priority);
StoredContainerRequest storedContainer2 = ContainerRequest storedContainer2 =
new StoredContainerRequest(capability2, nodes, racks, priority); new ContainerRequest(capability2, nodes, racks, priority);
StoredContainerRequest storedContainer3 = ContainerRequest storedContainer3 =
new StoredContainerRequest(capability3, nodes, racks, priority); new ContainerRequest(capability3, nodes, racks, priority);
StoredContainerRequest storedContainer4 = ContainerRequest storedContainer4 =
new StoredContainerRequest(capability4, nodes, racks, priority); new ContainerRequest(capability4, nodes, racks, priority);
StoredContainerRequest storedContainer5 = ContainerRequest storedContainer5 =
new StoredContainerRequest(capability5, nodes, racks, priority); new ContainerRequest(capability5, nodes, racks, priority);
StoredContainerRequest storedContainer6 = ContainerRequest storedContainer6 =
new StoredContainerRequest(capability6, nodes, racks, priority); new ContainerRequest(capability6, nodes, racks, priority);
StoredContainerRequest storedContainer7 = ContainerRequest storedContainer7 =
new StoredContainerRequest(capability7, nodes, racks, priority2, false); new ContainerRequest(capability7, nodes, racks, priority2, false);
amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2); amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3); amClient.addContainerRequest(storedContainer3);
@ -208,8 +208,8 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer7); amClient.addContainerRequest(storedContainer7);
// test matching of containers // test matching of containers
List<? extends Collection<StoredContainerRequest>> matches; List<? extends Collection<ContainerRequest>> matches;
StoredContainerRequest storedRequest; ContainerRequest storedRequest;
// exact match // exact match
Resource testCapability1 = Resource.newInstance(1024, 2); Resource testCapability1 = Resource.newInstance(1024, 2);
matches = amClient.getMatchingRequests(priority, node, testCapability1); matches = amClient.getMatchingRequests(priority, node, testCapability1);
@ -224,7 +224,7 @@ public class TestAMRMClient {
verifyMatches(matches, 2); verifyMatches(matches, 2);
// must be returned in the order they were made // must be returned in the order they were made
int i = 0; int i = 0;
for(StoredContainerRequest storedRequest1 : matches.get(0)) { for(ContainerRequest storedRequest1 : matches.get(0)) {
if(i++ == 0) { if(i++ == 0) {
assertTrue(storedContainer4 == storedRequest1); assertTrue(storedContainer4 == storedRequest1);
} else { } else {
@ -242,9 +242,9 @@ public class TestAMRMClient {
matches = amClient.getMatchingRequests(priority, node, testCapability4); matches = amClient.getMatchingRequests(priority, node, testCapability4);
assert(matches.size() == 2); assert(matches.size() == 2);
// verify non-fitting containers are not returned and fitting ones are // verify non-fitting containers are not returned and fitting ones are
for(Collection<StoredContainerRequest> testSet : matches) { for(Collection<ContainerRequest> testSet : matches) {
assertTrue(testSet.size() == 1); assertTrue(testSet.size() == 1);
StoredContainerRequest testRequest = testSet.iterator().next(); ContainerRequest testRequest = testSet.iterator().next();
assertTrue(testRequest != storedContainer4); assertTrue(testRequest != storedContainer4);
assertTrue(testRequest != storedContainer5); assertTrue(testRequest != storedContainer5);
assert(testRequest == storedContainer2 || assert(testRequest == storedContainer2 ||
@ -275,7 +275,7 @@ public class TestAMRMClient {
} }
private void verifyMatches( private void verifyMatches(
List<? extends Collection<StoredContainerRequest>> matches, List<? extends Collection<ContainerRequest>> matches,
int matchSize) { int matchSize) {
assertTrue(matches.size() == 1); assertTrue(matches.size() == 1);
assertTrue(matches.get(0).size() == matchSize); assertTrue(matches.get(0).size() == matchSize);
@ -283,23 +283,23 @@ public class TestAMRMClient {
@Test (timeout=60000) @Test (timeout=60000)
public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException { public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
AMRMClientImpl<StoredContainerRequest> amClient = null; AMRMClientImpl<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId); amClient = new AMRMClientImpl<ContainerRequest>(attemptId);
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
Resource capability = Resource.newInstance(1024, 2); Resource capability = Resource.newInstance(1024, 2);
StoredContainerRequest storedContainer1 = ContainerRequest storedContainer1 =
new StoredContainerRequest(capability, nodes, null, priority); new ContainerRequest(capability, nodes, null, priority);
amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer1);
// verify matching with original node and inferred rack // verify matching with original node and inferred rack
List<? extends Collection<StoredContainerRequest>> matches; List<? extends Collection<ContainerRequest>> matches;
StoredContainerRequest storedRequest; ContainerRequest storedRequest;
// exact match node // exact match node
matches = amClient.getMatchingRequests(priority, node, capability); matches = amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 1); verifyMatches(matches, 1);
@ -326,14 +326,14 @@ public class TestAMRMClient {
} }
} }
@Test (timeout=60000) @Test //(timeout=60000)
public void testAMRMClientMatchStorage() throws YarnException, IOException { public void testAMRMClientMatchStorage() throws YarnException, IOException {
AMRMClientImpl<StoredContainerRequest> amClient = null; AMRMClientImpl<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = amClient =
(AMRMClientImpl<StoredContainerRequest>) AMRMClient (AMRMClientImpl<ContainerRequest>) AMRMClient
.<StoredContainerRequest> createAMRMClient(attemptId); .<ContainerRequest> createAMRMClient(attemptId);
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
@ -341,12 +341,12 @@ public class TestAMRMClient {
Priority priority1 = Records.newRecord(Priority.class); Priority priority1 = Records.newRecord(Priority.class);
priority1.setPriority(2); priority1.setPriority(2);
StoredContainerRequest storedContainer1 = ContainerRequest storedContainer1 =
new StoredContainerRequest(capability, nodes, racks, priority); new ContainerRequest(capability, nodes, racks, priority);
StoredContainerRequest storedContainer2 = ContainerRequest storedContainer2 =
new StoredContainerRequest(capability, nodes, racks, priority); new ContainerRequest(capability, nodes, racks, priority);
StoredContainerRequest storedContainer3 = ContainerRequest storedContainer3 =
new StoredContainerRequest(capability, null, null, priority1); new ContainerRequest(capability, null, null, priority1);
amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2); amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3); amClient.addContainerRequest(storedContainer3);
@ -358,7 +358,7 @@ public class TestAMRMClient {
containersRequestedAny = amClient.remoteRequestsTable.get(priority1) containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedAny == 1); assertTrue(containersRequestedAny == 1);
List<? extends Collection<StoredContainerRequest>> matches = List<? extends Collection<ContainerRequest>> matches =
amClient.getMatchingRequests(priority, node, capability); amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 2); verifyMatches(matches, 2);
matches = amClient.getMatchingRequests(priority, rack, capability); matches = amClient.getMatchingRequests(priority, rack, capability);
@ -383,7 +383,7 @@ public class TestAMRMClient {
verifyMatches(matches, 1); verifyMatches(matches, 1);
// test matching of containers // test matching of containers
StoredContainerRequest storedRequest = matches.get(0).iterator().next(); ContainerRequest storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest); assertTrue(storedContainer1 == storedRequest);
amClient.removeContainerRequest(storedContainer1); amClient.removeContainerRequest(storedContainer1);
matches = matches =
@ -400,7 +400,7 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer3); amClient.addContainerRequest(storedContainer3);
// RM should allocate container within 2 calls to allocate() // RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0; int allocatedContainerCount = 0;
int iterationsLeft = 2; int iterationsLeft = 3;
while (allocatedContainerCount < 2 while (allocatedContainerCount < 2
&& iterationsLeft-- > 0) { && iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f); AllocateResponse allocResponse = amClient.allocate(0.1f);
@ -420,24 +420,23 @@ public class TestAMRMClient {
verifyMatches(matches, 1); verifyMatches(matches, 1);
ContainerRequest matchedRequest = matches.get(0).iterator().next(); ContainerRequest matchedRequest = matches.get(0).iterator().next();
assertTrue(matchedRequest == expectedRequest); assertTrue(matchedRequest == expectedRequest);
amClient.removeContainerRequest(matchedRequest);
// assign this container, use it and release it // assign this container, use it and release it
amClient.releaseAssignedContainer(container.getId()); amClient.releaseAssignedContainer(container.getId());
} }
if(allocatedContainerCount < containersRequestedAny) { if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations // sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000); sleep(100);
} }
} }
assertTrue(allocatedContainerCount == 2); assertTrue(allocatedContainerCount == 2);
assertTrue(amClient.release.size() == 2);
assertTrue(amClient.ask.size() == 0);
AllocateResponse allocResponse = amClient.allocate(0.1f); AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.release.size() == 0); assertTrue(amClient.release.size() == 0);
assertTrue(amClient.ask.size() == 0); assertTrue(amClient.ask.size() == 0);
assertTrue(allocResponse.getAllocatedContainers().size() == 0); assertTrue(allocResponse.getAllocatedContainers().size() == 0);
// 0 requests left. everything got cleaned up
assertTrue(amClient.remoteRequestsTable.isEmpty());
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null); null, null);
@ -480,11 +479,17 @@ public class TestAMRMClient {
assertTrue(amClient.release.size() == 0); assertTrue(amClient.release.size() == 0);
amClient.addContainerRequest( amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 1)); new ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest( amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 3)); new ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest( amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 2)); new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority) int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
.get(node).get(capability).remoteRequest.getNumContainers(); .get(node).get(capability).remoteRequest.getNumContainers();
@ -501,7 +506,7 @@ public class TestAMRMClient {
// RM should allocate container within 2 calls to allocate() // RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0; int allocatedContainerCount = 0;
int iterationsLeft = 2; int iterationsLeft = 3;
Set<ContainerId> releases = new TreeSet<ContainerId>(); Set<ContainerId> releases = new TreeSet<ContainerId>();
NMTokenCache.clearCache(); NMTokenCache.clearCache();
@ -532,7 +537,7 @@ public class TestAMRMClient {
if(allocatedContainerCount < containersRequestedAny) { if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations // sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000); sleep(100);
} }
} }
@ -546,7 +551,9 @@ public class TestAMRMClient {
// need to tell the AMRMClient that we dont need these resources anymore // need to tell the AMRMClient that we dont need these resources anymore
amClient.removeContainerRequest( amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 2)); new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
assertTrue(amClient.ask.size() == 3); assertTrue(amClient.ask.size() == 3);
// send 0 container count request for resources that are no longer needed // send 0 container count request for resources that are no longer needed
ResourceRequest snoopRequest = amClient.ask.iterator().next(); ResourceRequest snoopRequest = amClient.ask.iterator().next();
@ -554,7 +561,9 @@ public class TestAMRMClient {
// test RPC exception handling // test RPC exception handling
amClient.addContainerRequest(new ContainerRequest(capability, nodes, amClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority, 2)); racks, priority));
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority));
snoopRequest = amClient.ask.iterator().next(); snoopRequest = amClient.ask.iterator().next();
assertTrue(snoopRequest.getNumContainers() == 2); assertTrue(snoopRequest.getNumContainers() == 2);
@ -567,7 +576,9 @@ public class TestAMRMClient {
throws Exception { throws Exception {
amClient.removeContainerRequest( amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, new ContainerRequest(capability, nodes,
racks, priority, 2)); racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
throw new Exception(); throw new Exception();
} }
}); });
@ -585,7 +596,7 @@ public class TestAMRMClient {
// has not been lost // has not been lost
assertTrue(snoopRequest.getNumContainers() == 0); assertTrue(snoopRequest.getNumContainers() == 0);
iterationsLeft = 2; iterationsLeft = 3;
// do a few iterations to ensure RM is not going send new containers // do a few iterations to ensure RM is not going send new containers
while(!releases.isEmpty() || iterationsLeft-- > 0) { while(!releases.isEmpty() || iterationsLeft-- > 0) {
// inform RM of rejection // inform RM of rejection
@ -604,7 +615,7 @@ public class TestAMRMClient {
} }
if(iterationsLeft > 0) { if(iterationsLeft > 0) {
// sleep to make sure NM's heartbeat // sleep to make sure NM's heartbeat
sleep(1000); sleep(100);
} }
} }
assertTrue(amClient.ask.size() == 0); assertTrue(amClient.ask.size() == 0);

View File

@ -24,7 +24,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -53,7 +52,7 @@ public class TestAMRMClientContainerRequest {
Resource capability = Resource.newInstance(1024, 1); Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request = ContainerRequest request =
new ContainerRequest(capability, new String[] {"host1", "host2"}, new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1), 4); new String[] {"/rack2"}, Priority.newInstance(1));
client.addContainerRequest(request); client.addContainerRequest(request);
verifyResourceRequest(client, request, "host1", true); verifyResourceRequest(client, request, "host1", true);
verifyResourceRequest(client, request, "host2", true); verifyResourceRequest(client, request, "host2", true);
@ -75,7 +74,7 @@ public class TestAMRMClientContainerRequest {
Resource capability = Resource.newInstance(1024, 1); Resource capability = Resource.newInstance(1024, 1);
ContainerRequest nodeLevelRequest = ContainerRequest nodeLevelRequest =
new ContainerRequest(capability, new String[] {"host1", "host2"}, new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false); null, Priority.newInstance(1), false);
client.addContainerRequest(nodeLevelRequest); client.addContainerRequest(nodeLevelRequest);
verifyResourceRequest(client, nodeLevelRequest, ResourceRequest.ANY, false); verifyResourceRequest(client, nodeLevelRequest, ResourceRequest.ANY, false);
@ -87,12 +86,12 @@ public class TestAMRMClientContainerRequest {
// same priority // same priority
ContainerRequest nodeLevelRequest2 = ContainerRequest nodeLevelRequest2 =
new ContainerRequest(capability, new String[] {"host2", "host3"}, new ContainerRequest(capability, new String[] {"host2", "host3"},
null, Priority.newInstance(1), 4, false); null, Priority.newInstance(1), false);
client.addContainerRequest(nodeLevelRequest2); client.addContainerRequest(nodeLevelRequest2);
AMRMClient.ContainerRequest rackLevelRequest = AMRMClient.ContainerRequest rackLevelRequest =
new AMRMClient.ContainerRequest(capability, null, new AMRMClient.ContainerRequest(capability, null,
new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), 3, false); new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), false);
client.addContainerRequest(rackLevelRequest); client.addContainerRequest(rackLevelRequest);
verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY, false); verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY, false);
@ -103,13 +102,13 @@ public class TestAMRMClientContainerRequest {
// same priority // same priority
AMRMClient.ContainerRequest rackLevelRequest2 = AMRMClient.ContainerRequest rackLevelRequest2 =
new AMRMClient.ContainerRequest(capability, null, new AMRMClient.ContainerRequest(capability, null,
new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), 3, false); new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), false);
client.addContainerRequest(rackLevelRequest2); client.addContainerRequest(rackLevelRequest2);
ContainerRequest bothLevelRequest = ContainerRequest bothLevelRequest =
new ContainerRequest(capability, new String[] {"host3", "host4"}, new ContainerRequest(capability, new String[] {"host3", "host4"},
new String[] {"rack1", "/otherrack"}, new String[] {"rack1", "/otherrack"},
Priority.newInstance(3), 4, false); Priority.newInstance(3), false);
client.addContainerRequest(bothLevelRequest); client.addContainerRequest(bothLevelRequest);
verifyResourceRequest(client, bothLevelRequest, ResourceRequest.ANY, false); verifyResourceRequest(client, bothLevelRequest, ResourceRequest.ANY, false);
@ -125,7 +124,7 @@ public class TestAMRMClientContainerRequest {
ContainerRequest bothLevelRequest2 = ContainerRequest bothLevelRequest2 =
new ContainerRequest(capability, new String[] {"host4", "host5"}, new ContainerRequest(capability, new String[] {"host4", "host5"},
new String[] {"rack1", "/otherrack2"}, new String[] {"rack1", "/otherrack2"},
Priority.newInstance(3), 4, false); Priority.newInstance(3), false);
client.addContainerRequest(bothLevelRequest2); client.addContainerRequest(bothLevelRequest2);
} }
@ -142,11 +141,11 @@ public class TestAMRMClientContainerRequest {
Resource capability = Resource.newInstance(1024, 1); Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request1 = ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"}, new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false); null, Priority.newInstance(1), false);
client.addContainerRequest(request1); client.addContainerRequest(request1);
ContainerRequest request2 = ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host3"}, new ContainerRequest(capability, new String[] {"host3"},
null, Priority.newInstance(1), 4, true); null, Priority.newInstance(1), true);
client.addContainerRequest(request2); client.addContainerRequest(request2);
} }
@ -163,28 +162,28 @@ public class TestAMRMClientContainerRequest {
Resource capability = Resource.newInstance(1024, 1); Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request1 = ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"}, new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false); null, Priority.newInstance(1), false);
client.addContainerRequest(request1); client.addContainerRequest(request1);
client.removeContainerRequest(request1); client.removeContainerRequest(request1);
ContainerRequest request2 = ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host3"}, new ContainerRequest(capability, new String[] {"host3"},
null, Priority.newInstance(1), 4, true); null, Priority.newInstance(1), true);
client.addContainerRequest(request2); client.addContainerRequest(request2);
client.removeContainerRequest(request2); client.removeContainerRequest(request2);
ContainerRequest request3 = ContainerRequest request3 =
new ContainerRequest(capability, new String[] {"host1", "host2"}, new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false); null, Priority.newInstance(1), false);
client.addContainerRequest(request3); client.addContainerRequest(request3);
client.removeContainerRequest(request3); client.removeContainerRequest(request3);
ContainerRequest request4 = ContainerRequest request4 =
new ContainerRequest(capability, null, new ContainerRequest(capability, null,
new String[] {"rack1"}, Priority.newInstance(1), 4, true); new String[] {"rack1"}, Priority.newInstance(1), true);
client.addContainerRequest(request4); client.addContainerRequest(request4);
} }
@ -202,11 +201,11 @@ public class TestAMRMClientContainerRequest {
Resource capability = Resource.newInstance(1024, 1); Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request1 = ContainerRequest request1 =
new ContainerRequest(capability, new String[] {"host1", "host2"}, new ContainerRequest(capability, new String[] {"host1", "host2"},
null, Priority.newInstance(1), 4, false); null, Priority.newInstance(1), false);
client.addContainerRequest(request1); client.addContainerRequest(request1);
ContainerRequest request2 = ContainerRequest request2 =
new ContainerRequest(capability, null, new ContainerRequest(capability, null,
new String[] {"rack1"}, Priority.newInstance(1), 4, true); new String[] {"rack1"}, Priority.newInstance(1), true);
client.addContainerRequest(request2); client.addContainerRequest(request2);
} }
@ -227,7 +226,7 @@ public class TestAMRMClientContainerRequest {
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority()) ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
.get(location).get(request.getCapability()).remoteRequest; .get(location).get(request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName()); assertEquals(location, ask.getResourceName());
assertEquals(request.getContainerCount(), ask.getNumContainers()); assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
} }
} }

View File

@ -29,7 +29,6 @@ import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMClient;
@ -220,7 +218,7 @@ public class TestNMClient {
for (int i = 0; i < num; ++i) { for (int i = 0; i < num; ++i) {
rmClient.addContainerRequest(new ContainerRequest(capability, nodes, rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority, 1)); racks, priority));
} }
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority) int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)