YARN-2501. Enhanced AMRMClient library to support requests against node labels. Contributed by Wangda Tan.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-10-10 19:57:39 -07:00
parent 428dfaad60
commit a5ec3d0809
4 changed files with 72 additions and 9 deletions

View File

@ -156,6 +156,9 @@ Release 2.6.0 - UNRELEASED
YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda
Tan via vinodkv)
YARN-2501. Enhanced AMRMClient library to support requests against node
labels. (Wangda Tan via vinodkv)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -105,6 +105,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
final List<String> racks;
final Priority priority;
final boolean relaxLocality;
final String nodeLabelsExpression;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
@ -124,9 +125,9 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
*/
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority) {
this(capability, nodes, racks, priority, true);
this(capability, nodes, racks, priority, true, null);
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
@ -147,6 +148,32 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
*/
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, boolean relaxLocality) {
this(capability, nodes, racks, priority, relaxLocality, null);
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
* @param nodeLabelsExpression
* Set node labels to allocate resource
*/
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, boolean relaxLocality,
String nodeLabelsExpression) {
// Validate request
Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " +
@ -163,6 +190,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority;
this.relaxLocality = relaxLocality;
this.nodeLabelsExpression = nodeLabelsExpression;
}
public Resource getCapability() {
@ -185,6 +213,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return relaxLocality;
}
public String getNodeLabelExpression() {
return nodeLabelsExpression;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");

View File

@ -251,7 +251,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// RPC layer is using it to send info across
askList.add(ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality()));
r.getRelaxLocality(), r.getNodeLabelExpression()));
}
releaseList = new ArrayList<ContainerId>(release);
// optimistically clear this collection assuming no RPC failure
@ -436,25 +436,25 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
for (String node : dedupedNodes) {
addResourceRequest(req.getPriority(), node, req.getCapability(), req,
true);
true, req.getNodeLabelExpression());
}
}
for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
true);
true, req.getNodeLabelExpression());
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
req.getRelaxLocality());
req.getRelaxLocality(), req.getNodeLabelExpression());
}
// Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getCapability(), req, req.getRelaxLocality());
req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression());
}
@Override
@ -608,8 +608,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ask.add(remoteRequest);
}
private void addResourceRequest(Priority priority, String resourceName,
Resource capability, T req, boolean relaxLocality) {
private void
addResourceRequest(Priority priority, String resourceName,
Resource capability, T req, boolean relaxLocality,
String labelExpression) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
@ -642,6 +644,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
if (relaxLocality) {
resourceRequestInfo.containerRequests.add(req);
}
resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import com.google.common.base.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -147,6 +148,7 @@ public class TestAMRMClient {
racks = new String[]{ rack };
}
@SuppressWarnings("deprecation")
@Before
public void startApp() throws Exception {
// submit new app
@ -667,6 +669,28 @@ public class TestAMRMClient {
}
}
}
@Test(timeout=30000)
public void testAskWithNodeLabels() {
AMRMClientImpl<ContainerRequest> client =
new AMRMClientImpl<ContainerRequest>();
// add x, y to ANY
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "x && y"));
Assert.assertEquals(1, client.ask.size());
Assert.assertEquals("x && y", client.ask.iterator().next()
.getNodeLabelExpression());
// add x, y and a, b to ANY, only a, b should be kept
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "x && y"));
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "a && b"));
Assert.assertEquals(1, client.ask.size());
Assert.assertEquals("a && b", client.ask.iterator().next()
.getNodeLabelExpression());
}
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
throws YarnException, IOException {