diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index e933ef47206..ceaf106f367 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -88,6 +88,12 @@ public interface MRJobConfig {
*/
public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression";
+ /**
+ * Specify strict locality on a comma-separated list of racks and/or nodes.
+ * Syntax: /rack or /rack/node or node (assumes /default-rack)
+ */
+ public static final String AM_STRICT_LOCALITY = "mapreduce.job.am.strict-locality";
+
public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
public static final String JOB_TAGS = "mapreduce.job.tags";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 2339c79f62b..1baa467cdeb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -22,13 +22,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -101,6 +102,25 @@ public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+ private static final String RACK_GROUP = "rack";
+ private static final String NODE_IF_RACK_GROUP = "node1";
+ private static final String NODE_IF_NO_RACK_GROUP = "node2";
+
+ /**
+ * Matches any of the following patterns with capturing groups:
+ *
+ * - /rack
+ * - /rack/node
+ * - node (assumes /default-rack)
+ *
+ * The groups can be retrieved using the RACK_GROUP, NODE_IF_RACK_GROUP,
+ * and/or NODE_IF_NO_RACK_GROUP group keys.
+ */
+ private static final Pattern RACK_NODE_PATTERN =
+ Pattern.compile(
+ String.format("(?<%s>[^/]+?)|(?<%s>/[^/]+?)(?:/(?<%s>[^/]+?))?",
+ NODE_IF_NO_RACK_GROUP, RACK_GROUP, NODE_IF_RACK_GROUP));
+
private final static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
@@ -503,20 +523,6 @@ public class YARNRunner implements ClientProtocol {
throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
- // Setup resource requirements
- Resource capability = recordFactory.newRecordInstance(Resource.class);
- capability.setMemorySize(
- conf.getInt(
- MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
- )
- );
- capability.setVirtualCores(
- conf.getInt(
- MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
- )
- );
- LOG.debug("AppMaster capability = " + capability);
-
// Setup LocalResources
Map localResources =
setupLocalResources(jobConf, jobSubmitDir);
@@ -577,21 +583,18 @@ public class YARNRunner implements ClientProtocol {
appContext.setMaxAppAttempts(
conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
- appContext.setResource(capability);
- // set labels for the AM container request if present
+ // Setup the AM ResourceRequests
+ List amResourceRequests = generateResourceRequests();
+ appContext.setAMContainerResourceRequests(amResourceRequests);
+
+ // set labels for the AM container requests if present
String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
if (null != amNodelabelExpression
&& amNodelabelExpression.trim().length() != 0) {
- ResourceRequest amResourceRequest =
- recordFactory.newRecordInstance(ResourceRequest.class);
- amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
- amResourceRequest.setResourceName(ResourceRequest.ANY);
- amResourceRequest.setCapability(capability);
- amResourceRequest.setNumContainers(1);
- amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
- appContext.setAMContainerResourceRequests(
- Collections.singletonList(amResourceRequest));
+ for (ResourceRequest amResourceRequest : amResourceRequests) {
+ amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
+ }
}
// set labels for the Job containers
appContext.setNodeLabelExpression(jobConf
@@ -616,6 +619,83 @@ public class YARNRunner implements ClientProtocol {
return appContext;
}
+ private List generateResourceRequests() throws IOException {
+ Resource capability = recordFactory.newRecordInstance(Resource.class);
+ capability.setMemorySize(
+ conf.getInt(
+ MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+ )
+ );
+ capability.setVirtualCores(
+ conf.getInt(
+ MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+ )
+ );
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMaster capability = " + capability);
+ }
+
+ List amResourceRequests = new ArrayList<>();
+ // Always have an ANY request
+ ResourceRequest amAnyResourceRequest =
+ createAMResourceRequest(ResourceRequest.ANY, capability);
+ Map rackRequests = new HashMap<>();
+ amResourceRequests.add(amAnyResourceRequest);
+ Collection amStrictResources = conf.getStringCollection(
+ MRJobConfig.AM_STRICT_LOCALITY);
+ for (String amStrictResource : amStrictResources) {
+ amAnyResourceRequest.setRelaxLocality(false);
+ Matcher matcher = RACK_NODE_PATTERN.matcher(amStrictResource);
+ if (matcher.matches()) {
+ String nodeName;
+ String rackName = matcher.group(RACK_GROUP);
+ if (rackName == null) {
+ rackName = "/default-rack";
+ nodeName = matcher.group(NODE_IF_NO_RACK_GROUP);
+ } else {
+ nodeName = matcher.group(NODE_IF_RACK_GROUP);
+ }
+ ResourceRequest amRackResourceRequest = rackRequests.get(rackName);
+ if (amRackResourceRequest == null) {
+ amRackResourceRequest = createAMResourceRequest(rackName, capability);
+ amResourceRequests.add(amRackResourceRequest);
+ rackRequests.put(rackName, amRackResourceRequest);
+ }
+ if (nodeName != null) {
+ amRackResourceRequest.setRelaxLocality(false);
+ ResourceRequest amNodeResourceRequest =
+ createAMResourceRequest(nodeName, capability);
+ amResourceRequests.add(amNodeResourceRequest);
+ }
+ } else {
+ String errMsg =
+ "Invalid resource name: " + amStrictResource + " specified.";
+ LOG.warn(errMsg);
+ throw new IOException(errMsg);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ for (ResourceRequest amResourceRequest : amResourceRequests) {
+ LOG.debug("ResourceRequest: resource = "
+ + amResourceRequest.getResourceName() + ", locality = "
+ + amResourceRequest.getRelaxLocality());
+ }
+ }
+ return amResourceRequests;
+ }
+
+ private ResourceRequest createAMResourceRequest(String resource,
+ Resource capability) {
+ ResourceRequest resourceRequest =
+ recordFactory.newRecordInstance(ResourceRequest.class);
+ resourceRequest.setPriority(AM_CONTAINER_PRIORITY);
+ resourceRequest.setResourceName(resource);
+ resourceRequest.setCapability(capability);
+ resourceRequest.setNumContainers(1);
+ resourceRequest.setRelaxLocality(true);
+ return resourceRequest;
+ }
+
private void setTokenRenewerConf(ContainerLaunchContext context,
Configuration conf, String regex) throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index cb355b5cadd..de629b03d82 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -39,6 +40,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -93,6 +95,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
@@ -575,6 +579,169 @@ public class TestYARNRunner {
.getNodeLabelExpression(), "highMem");
}
+ @Test
+ public void testResourceRequestLocalityAny() throws Exception {
+ ResourceRequest amAnyResourceRequest =
+ createResourceRequest(ResourceRequest.ANY, true);
+ verifyResourceRequestLocality(null, null, amAnyResourceRequest);
+ verifyResourceRequestLocality(null, "label1", amAnyResourceRequest);
+ }
+
+ @Test
+ public void testResourceRequestLocalityRack() throws Exception {
+ ResourceRequest amAnyResourceRequest =
+ createResourceRequest(ResourceRequest.ANY, false);
+ ResourceRequest amRackResourceRequest =
+ createResourceRequest("/rack1", true);
+ verifyResourceRequestLocality("/rack1", null, amAnyResourceRequest,
+ amRackResourceRequest);
+ verifyResourceRequestLocality("/rack1", "label1", amAnyResourceRequest,
+ amRackResourceRequest);
+ }
+
+ @Test
+ public void testResourceRequestLocalityNode() throws Exception {
+ ResourceRequest amAnyResourceRequest =
+ createResourceRequest(ResourceRequest.ANY, false);
+ ResourceRequest amRackResourceRequest =
+ createResourceRequest("/rack1", false);
+ ResourceRequest amNodeResourceRequest =
+ createResourceRequest("node1", true);
+ verifyResourceRequestLocality("/rack1/node1", null, amAnyResourceRequest,
+ amRackResourceRequest, amNodeResourceRequest);
+ verifyResourceRequestLocality("/rack1/node1", "label1",
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
+ }
+
+ @Test
+ public void testResourceRequestLocalityNodeDefaultRack() throws Exception {
+ ResourceRequest amAnyResourceRequest =
+ createResourceRequest(ResourceRequest.ANY, false);
+ ResourceRequest amRackResourceRequest =
+ createResourceRequest("/default-rack", false);
+ ResourceRequest amNodeResourceRequest =
+ createResourceRequest("node1", true);
+ verifyResourceRequestLocality("node1", null,
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
+ verifyResourceRequestLocality("node1", "label1",
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
+ }
+
+ @Test
+ public void testResourceRequestLocalityMultipleNodes() throws Exception {
+ ResourceRequest amAnyResourceRequest =
+ createResourceRequest(ResourceRequest.ANY, false);
+ ResourceRequest amRackResourceRequest =
+ createResourceRequest("/rack1", false);
+ ResourceRequest amNodeResourceRequest =
+ createResourceRequest("node1", true);
+ ResourceRequest amNode2ResourceRequest =
+ createResourceRequest("node2", true);
+ verifyResourceRequestLocality("/rack1/node1,/rack1/node2", null,
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+ amNode2ResourceRequest);
+ verifyResourceRequestLocality("/rack1/node1,/rack1/node2", "label1",
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+ amNode2ResourceRequest);
+ }
+
+ @Test
+ public void testResourceRequestLocalityMultipleNodesDifferentRack()
+ throws Exception {
+ ResourceRequest amAnyResourceRequest =
+ createResourceRequest(ResourceRequest.ANY, false);
+ ResourceRequest amRackResourceRequest =
+ createResourceRequest("/rack1", false);
+ ResourceRequest amNodeResourceRequest =
+ createResourceRequest("node1", true);
+ ResourceRequest amRack2ResourceRequest =
+ createResourceRequest("/rack2", false);
+ ResourceRequest amNode2ResourceRequest =
+ createResourceRequest("node2", true);
+ verifyResourceRequestLocality("/rack1/node1,/rack2/node2", null,
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+ amRack2ResourceRequest, amNode2ResourceRequest);
+ verifyResourceRequestLocality("/rack1/node1,/rack2/node2", "label1",
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+ amRack2ResourceRequest, amNode2ResourceRequest);
+ }
+
+ @Test
+ public void testResourceRequestLocalityMultipleNodesDefaultRack()
+ throws Exception {
+ ResourceRequest amAnyResourceRequest =
+ createResourceRequest(ResourceRequest.ANY, false);
+ ResourceRequest amRackResourceRequest =
+ createResourceRequest("/rack1", false);
+ ResourceRequest amNodeResourceRequest =
+ createResourceRequest("node1", true);
+ ResourceRequest amRack2ResourceRequest =
+ createResourceRequest("/default-rack", false);
+ ResourceRequest amNode2ResourceRequest =
+ createResourceRequest("node2", true);
+ verifyResourceRequestLocality("/rack1/node1,node2", null,
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+ amRack2ResourceRequest, amNode2ResourceRequest);
+ verifyResourceRequestLocality("/rack1/node1,node2", "label1",
+ amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
+ amRack2ResourceRequest, amNode2ResourceRequest);
+ }
+
+ @Test
+ public void testResourceRequestLocalityInvalid() throws Exception {
+ try {
+ verifyResourceRequestLocality("rack/node1", null,
+ new ResourceRequest[]{});
+ fail("Should have failed due to invalid resource but did not");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("Invalid resource name"));
+ }
+ try {
+ verifyResourceRequestLocality("/rack/node1/blah", null,
+ new ResourceRequest[]{});
+ fail("Should have failed due to invalid resource but did not");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("Invalid resource name"));
+ }
+ }
+
+ private void verifyResourceRequestLocality(String strictResource,
+ String label, ResourceRequest... expectedReqs) throws Exception {
+ JobConf jobConf = new JobConf();
+ if (strictResource != null) {
+ jobConf.set(MRJobConfig.AM_STRICT_LOCALITY, strictResource);
+ }
+ if (label != null) {
+ jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, label);
+ for (ResourceRequest expectedReq : expectedReqs) {
+ expectedReq.setNodeLabelExpression(label);
+ }
+ }
+
+ YARNRunner yarnRunner = new YARNRunner(jobConf);
+ ApplicationSubmissionContext appSubCtx =
+ buildSubmitContext(yarnRunner, jobConf);
+ assertEquals(Arrays.asList(expectedReqs),
+ appSubCtx.getAMContainerResourceRequests());
+ }
+
+ private ResourceRequest createResourceRequest(String name,
+ boolean relaxLocality) {
+ Resource capability = recordFactory.newRecordInstance(Resource.class);
+ capability.setMemorySize(MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
+ capability.setVirtualCores(MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
+
+ ResourceRequest req =
+ recordFactory.newRecordInstance(ResourceRequest.class);
+ req.setPriority(YARNRunner.AM_CONTAINER_PRIORITY);
+ req.setResourceName(name);
+ req.setCapability(capability);
+ req.setNumContainers(1);
+ req.setRelaxLocality(relaxLocality);
+
+ return req;
+ }
+
@Test
public void testAMStandardEnvWithDefaultLibPath() throws Exception {
testAMStandardEnv(false);