From 3721cfe1fbd98c5b6aa46aefdfcf62276c28c4a4 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Fri, 21 Apr 2017 16:12:01 -0700 Subject: [PATCH] MAPREDUCE-6871. Allow users to specify racks and nodes for strict locality for AMs (rkanter) --- .../apache/hadoop/mapreduce/MRJobConfig.java | 6 + .../org/apache/hadoop/mapred/YARNRunner.java | 132 +++++++++++--- .../apache/hadoop/mapred/TestYARNRunner.java | 167 ++++++++++++++++++ 3 files changed, 279 insertions(+), 26 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 18bf1391fee..cfc1bcc72a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -91,6 +91,12 @@ public interface MRJobConfig { */ public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression"; + /** + * Specify strict locality on a comma-separated list of racks and/or nodes. + * Syntax: /rack or /rack/node or node (assumes /default-rack) + */ + public static final String AM_STRICT_LOCALITY = "mapreduce.job.am.strict-locality"; + public static final String RESERVATION_ID = "mapreduce.job.reservation.id"; public static final String JOB_TAGS = "mapreduce.job.tags"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 2339c79f62b..1baa467cdeb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -22,13 +22,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Vector; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,6 +102,25 @@ public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); + private static final String RACK_GROUP = "rack"; + private static final String NODE_IF_RACK_GROUP = "node1"; + private static final String NODE_IF_NO_RACK_GROUP = "node2"; + + /** + * Matches any of the following patterns with capturing groups: + * + * The groups can be retrieved using the RACK_GROUP, NODE_IF_RACK_GROUP, + * and/or NODE_IF_NO_RACK_GROUP group keys. + */ + private static final Pattern RACK_NODE_PATTERN = + Pattern.compile( + String.format("(?<%s>[^/]+?)|(?<%s>/[^/]+?)(?:/(?<%s>[^/]+?))?", + NODE_IF_NO_RACK_GROUP, RACK_GROUP, NODE_IF_RACK_GROUP)); + private final static RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); @@ -503,20 +523,6 @@ public class YARNRunner implements ClientProtocol { throws IOException { ApplicationId applicationId = resMgrDelegate.getApplicationId(); - // Setup resource requirements - Resource capability = recordFactory.newRecordInstance(Resource.class); - capability.setMemorySize( - conf.getInt( - MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB - ) - ); - capability.setVirtualCores( - conf.getInt( - MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES - ) - ); - LOG.debug("AppMaster capability = " + capability); - // Setup LocalResources Map localResources = setupLocalResources(jobConf, jobSubmitDir); @@ -577,21 +583,18 @@ public class YARNRunner implements ClientProtocol { appContext.setMaxAppAttempts( conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); - appContext.setResource(capability); - // set labels for the AM container request if present + // Setup the AM ResourceRequests + List amResourceRequests = generateResourceRequests(); + appContext.setAMContainerResourceRequests(amResourceRequests); + + // set labels for the AM container requests if present String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP); if (null != amNodelabelExpression && amNodelabelExpression.trim().length() != 0) { - ResourceRequest amResourceRequest = - recordFactory.newRecordInstance(ResourceRequest.class); - amResourceRequest.setPriority(AM_CONTAINER_PRIORITY); - amResourceRequest.setResourceName(ResourceRequest.ANY); - amResourceRequest.setCapability(capability); - amResourceRequest.setNumContainers(1); - amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim()); - appContext.setAMContainerResourceRequests( - Collections.singletonList(amResourceRequest)); + for (ResourceRequest amResourceRequest : amResourceRequests) { + amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim()); + } } // set labels for the Job containers appContext.setNodeLabelExpression(jobConf @@ -616,6 +619,83 @@ public class YARNRunner implements ClientProtocol { return appContext; } + private List generateResourceRequests() throws IOException { + Resource capability = recordFactory.newRecordInstance(Resource.class); + capability.setMemorySize( + conf.getInt( + MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB + ) + ); + capability.setVirtualCores( + conf.getInt( + MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES + ) + ); + if (LOG.isDebugEnabled()) { + LOG.debug("AppMaster capability = " + capability); + } + + List amResourceRequests = new ArrayList<>(); + // Always have an ANY request + ResourceRequest amAnyResourceRequest = + createAMResourceRequest(ResourceRequest.ANY, capability); + Map rackRequests = new HashMap<>(); + amResourceRequests.add(amAnyResourceRequest); + Collection amStrictResources = conf.getStringCollection( + MRJobConfig.AM_STRICT_LOCALITY); + for (String amStrictResource : amStrictResources) { + amAnyResourceRequest.setRelaxLocality(false); + Matcher matcher = RACK_NODE_PATTERN.matcher(amStrictResource); + if (matcher.matches()) { + String nodeName; + String rackName = matcher.group(RACK_GROUP); + if (rackName == null) { + rackName = "/default-rack"; + nodeName = matcher.group(NODE_IF_NO_RACK_GROUP); + } else { + nodeName = matcher.group(NODE_IF_RACK_GROUP); + } + ResourceRequest amRackResourceRequest = rackRequests.get(rackName); + if (amRackResourceRequest == null) { + amRackResourceRequest = createAMResourceRequest(rackName, capability); + amResourceRequests.add(amRackResourceRequest); + rackRequests.put(rackName, amRackResourceRequest); + } + if (nodeName != null) { + amRackResourceRequest.setRelaxLocality(false); + ResourceRequest amNodeResourceRequest = + createAMResourceRequest(nodeName, capability); + amResourceRequests.add(amNodeResourceRequest); + } + } else { + String errMsg = + "Invalid resource name: " + amStrictResource + " specified."; + LOG.warn(errMsg); + throw new IOException(errMsg); + } + } + if (LOG.isDebugEnabled()) { + for (ResourceRequest amResourceRequest : amResourceRequests) { + LOG.debug("ResourceRequest: resource = " + + amResourceRequest.getResourceName() + ", locality = " + + amResourceRequest.getRelaxLocality()); + } + } + return amResourceRequests; + } + + private ResourceRequest createAMResourceRequest(String resource, + Resource capability) { + ResourceRequest resourceRequest = + recordFactory.newRecordInstance(ResourceRequest.class); + resourceRequest.setPriority(AM_CONTAINER_PRIORITY); + resourceRequest.setResourceName(resource); + resourceRequest.setCapability(capability); + resourceRequest.setNumContainers(1); + resourceRequest.setRelaxLocality(true); + return resourceRequest; + } + private void setTokenRenewerConf(ContainerLaunchContext context, Configuration conf, String regex) throws IOException { DataOutputBuffer dob = new DataOutputBuffer(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index c2bda6235a0..bd3e524d44b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -40,6 +41,7 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -93,6 +95,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; @@ -575,6 +579,169 @@ public class TestYARNRunner { .getNodeLabelExpression(), "highMem"); } + @Test + public void testResourceRequestLocalityAny() throws Exception { + ResourceRequest amAnyResourceRequest = + createResourceRequest(ResourceRequest.ANY, true); + verifyResourceRequestLocality(null, null, amAnyResourceRequest); + verifyResourceRequestLocality(null, "label1", amAnyResourceRequest); + } + + @Test + public void testResourceRequestLocalityRack() throws Exception { + ResourceRequest amAnyResourceRequest = + createResourceRequest(ResourceRequest.ANY, false); + ResourceRequest amRackResourceRequest = + createResourceRequest("/rack1", true); + verifyResourceRequestLocality("/rack1", null, amAnyResourceRequest, + amRackResourceRequest); + verifyResourceRequestLocality("/rack1", "label1", amAnyResourceRequest, + amRackResourceRequest); + } + + @Test + public void testResourceRequestLocalityNode() throws Exception { + ResourceRequest amAnyResourceRequest = + createResourceRequest(ResourceRequest.ANY, false); + ResourceRequest amRackResourceRequest = + createResourceRequest("/rack1", false); + ResourceRequest amNodeResourceRequest = + createResourceRequest("node1", true); + verifyResourceRequestLocality("/rack1/node1", null, amAnyResourceRequest, + amRackResourceRequest, amNodeResourceRequest); + verifyResourceRequestLocality("/rack1/node1", "label1", + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest); + } + + @Test + public void testResourceRequestLocalityNodeDefaultRack() throws Exception { + ResourceRequest amAnyResourceRequest = + createResourceRequest(ResourceRequest.ANY, false); + ResourceRequest amRackResourceRequest = + createResourceRequest("/default-rack", false); + ResourceRequest amNodeResourceRequest = + createResourceRequest("node1", true); + verifyResourceRequestLocality("node1", null, + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest); + verifyResourceRequestLocality("node1", "label1", + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest); + } + + @Test + public void testResourceRequestLocalityMultipleNodes() throws Exception { + ResourceRequest amAnyResourceRequest = + createResourceRequest(ResourceRequest.ANY, false); + ResourceRequest amRackResourceRequest = + createResourceRequest("/rack1", false); + ResourceRequest amNodeResourceRequest = + createResourceRequest("node1", true); + ResourceRequest amNode2ResourceRequest = + createResourceRequest("node2", true); + verifyResourceRequestLocality("/rack1/node1,/rack1/node2", null, + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest, + amNode2ResourceRequest); + verifyResourceRequestLocality("/rack1/node1,/rack1/node2", "label1", + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest, + amNode2ResourceRequest); + } + + @Test + public void testResourceRequestLocalityMultipleNodesDifferentRack() + throws Exception { + ResourceRequest amAnyResourceRequest = + createResourceRequest(ResourceRequest.ANY, false); + ResourceRequest amRackResourceRequest = + createResourceRequest("/rack1", false); + ResourceRequest amNodeResourceRequest = + createResourceRequest("node1", true); + ResourceRequest amRack2ResourceRequest = + createResourceRequest("/rack2", false); + ResourceRequest amNode2ResourceRequest = + createResourceRequest("node2", true); + verifyResourceRequestLocality("/rack1/node1,/rack2/node2", null, + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest, + amRack2ResourceRequest, amNode2ResourceRequest); + verifyResourceRequestLocality("/rack1/node1,/rack2/node2", "label1", + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest, + amRack2ResourceRequest, amNode2ResourceRequest); + } + + @Test + public void testResourceRequestLocalityMultipleNodesDefaultRack() + throws Exception { + ResourceRequest amAnyResourceRequest = + createResourceRequest(ResourceRequest.ANY, false); + ResourceRequest amRackResourceRequest = + createResourceRequest("/rack1", false); + ResourceRequest amNodeResourceRequest = + createResourceRequest("node1", true); + ResourceRequest amRack2ResourceRequest = + createResourceRequest("/default-rack", false); + ResourceRequest amNode2ResourceRequest = + createResourceRequest("node2", true); + verifyResourceRequestLocality("/rack1/node1,node2", null, + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest, + amRack2ResourceRequest, amNode2ResourceRequest); + verifyResourceRequestLocality("/rack1/node1,node2", "label1", + amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest, + amRack2ResourceRequest, amNode2ResourceRequest); + } + + @Test + public void testResourceRequestLocalityInvalid() throws Exception { + try { + verifyResourceRequestLocality("rack/node1", null, + new ResourceRequest[]{}); + fail("Should have failed due to invalid resource but did not"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Invalid resource name")); + } + try { + verifyResourceRequestLocality("/rack/node1/blah", null, + new ResourceRequest[]{}); + fail("Should have failed due to invalid resource but did not"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Invalid resource name")); + } + } + + private void verifyResourceRequestLocality(String strictResource, + String label, ResourceRequest... expectedReqs) throws Exception { + JobConf jobConf = new JobConf(); + if (strictResource != null) { + jobConf.set(MRJobConfig.AM_STRICT_LOCALITY, strictResource); + } + if (label != null) { + jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, label); + for (ResourceRequest expectedReq : expectedReqs) { + expectedReq.setNodeLabelExpression(label); + } + } + + YARNRunner yarnRunner = new YARNRunner(jobConf); + ApplicationSubmissionContext appSubCtx = + buildSubmitContext(yarnRunner, jobConf); + assertEquals(Arrays.asList(expectedReqs), + appSubCtx.getAMContainerResourceRequests()); + } + + private ResourceRequest createResourceRequest(String name, + boolean relaxLocality) { + Resource capability = recordFactory.newRecordInstance(Resource.class); + capability.setMemorySize(MRJobConfig.DEFAULT_MR_AM_VMEM_MB); + capability.setVirtualCores(MRJobConfig.DEFAULT_MR_AM_CPU_VCORES); + + ResourceRequest req = + recordFactory.newRecordInstance(ResourceRequest.class); + req.setPriority(YARNRunner.AM_CONTAINER_PRIORITY); + req.setResourceName(name); + req.setCapability(capability); + req.setNumContainers(1); + req.setRelaxLocality(relaxLocality); + + return req; + } + @Test public void testAMStandardEnvWithDefaultLibPath() throws Exception { testAMStandardEnv(false);