diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2a544ba35d0..7e07607b92a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -156,6 +156,9 @@ Release 2.6.0 - UNRELEASED YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda Tan via vinodkv) + YARN-2501. Enhanced AMRMClient library to support requests against node + labels. (Wangda Tan via vinodkv) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index f41c018cea4..6f8c65a9fa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -105,6 +105,7 @@ public abstract class AMRMClient extends final List racks; final Priority priority; final boolean relaxLocality; + final String nodeLabelsExpression; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -124,9 +125,9 @@ public abstract class AMRMClient extends */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority) { - this(capability, nodes, racks, priority, true); + this(capability, nodes, racks, priority, true, null); } - + /** * Instantiates a {@link ContainerRequest} with the given constraints. * @@ -147,6 +148,32 @@ public abstract class AMRMClient extends */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality) { + this(capability, nodes, racks, priority, relaxLocality, null); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression + * Set node labels to allocate resource + */ + public ContainerRequest(Resource capability, String[] nodes, + String[] racks, Priority priority, boolean relaxLocality, + String nodeLabelsExpression) { // Validate request Preconditions.checkArgument(capability != null, "The Resource to be requested for each container " + @@ -163,6 +190,7 @@ public abstract class AMRMClient extends this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); this.priority = priority; this.relaxLocality = relaxLocality; + this.nodeLabelsExpression = nodeLabelsExpression; } public Resource getCapability() { @@ -185,6 +213,10 @@ public abstract class AMRMClient extends return relaxLocality; } + public String getNodeLabelExpression() { + return nodeLabelsExpression; + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 88b2f456a89..e5e32e95195 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -251,7 +251,7 @@ public class AMRMClientImpl extends AMRMClient { // RPC layer is using it to send info across askList.add(ResourceRequest.newInstance(r.getPriority(), r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality())); + r.getRelaxLocality(), r.getNodeLabelExpression())); } releaseList = new ArrayList(release); // optimistically clear this collection assuming no RPC failure @@ -436,25 +436,25 @@ public class AMRMClientImpl extends AMRMClient { } for (String node : dedupedNodes) { addResourceRequest(req.getPriority(), node, req.getCapability(), req, - true); + true, req.getNodeLabelExpression()); } } for (String rack : dedupedRacks) { addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - true); + true, req.getNodeLabelExpression()); } // Ensure node requests are accompanied by requests for // corresponding rack for (String rack : inferredRacks) { addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - req.getRelaxLocality()); + req.getRelaxLocality(), req.getNodeLabelExpression()); } // Off-switch addResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getCapability(), req, req.getRelaxLocality()); + req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression()); } @Override @@ -608,8 +608,10 @@ public class AMRMClientImpl extends AMRMClient { ask.add(remoteRequest); } - private void addResourceRequest(Priority priority, String resourceName, - Resource capability, T req, boolean relaxLocality) { + private void + addResourceRequest(Priority priority, String resourceName, + Resource capability, T req, boolean relaxLocality, + String labelExpression) { Map> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -642,6 +644,8 @@ public class AMRMClientImpl extends AMRMClient { if (relaxLocality) { resourceRequestInfo.containerRequests.add(req); } + + resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); // Note this down for next interaction with ResourceManager addResourceRequestToAsk(resourceRequestInfo.remoteRequest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 4921452375e..35e6635e320 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import com.google.common.base.Supplier; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -147,6 +148,7 @@ public class TestAMRMClient { racks = new String[]{ rack }; } + @SuppressWarnings("deprecation") @Before public void startApp() throws Exception { // submit new app @@ -667,6 +669,28 @@ public class TestAMRMClient { } } } + + @Test(timeout=30000) + public void testAskWithNodeLabels() { + AMRMClientImpl client = + new AMRMClientImpl(); + + // add x, y to ANY + client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, + 1), null, null, Priority.UNDEFINED, true, "x && y")); + Assert.assertEquals(1, client.ask.size()); + Assert.assertEquals("x && y", client.ask.iterator().next() + .getNodeLabelExpression()); + + // add x, y and a, b to ANY, only a, b should be kept + client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, + 1), null, null, Priority.UNDEFINED, true, "x && y")); + client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, + 1), null, null, Priority.UNDEFINED, true, "a && b")); + Assert.assertEquals(1, client.ask.size()); + Assert.assertEquals("a && b", client.ask.iterator().next() + .getNodeLabelExpression()); + } private void testAllocation(final AMRMClientImpl amClient) throws YarnException, IOException {