From 3164e7d83875aa6b7435d1dfe61ac280aa277f1c Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 27 May 2015 14:26:03 -0700 Subject: [PATCH] MAPREDUCE-6304. Specifying node labels when submitting MR jobs. (Naganarasimha G R via wangda) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 19 +++- .../v2/app/rm/RMContainerRequestor.java | 32 ++++--- .../v2/app/rm/TestRMContainerAllocator.java | 89 ++++++++++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 20 +++++ .../src/main/resources/mapred-default.xml | 35 ++++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 30 ++++++- .../apache/hadoop/mapred/TestYARNRunner.java | 16 ++++ 8 files changed, 226 insertions(+), 18 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 40ef0370272..c151dbb9dab 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -270,6 +270,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6364. Add a "Kill" link to Task Attempts page. (Ryu Kobayashi via ozawa) + MAPREDUCE-6304. Specifying node labels when submitting MR jobs. + (Naganarasimha G R via wangda) + IMPROVEMENTS MAPREDUCE-6291. Correct mapred queue usage command. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 8cdcaa840bf..1aeee2c9865 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -180,6 +180,10 @@ public class RMContainerAllocator extends RMContainerRequestor private ScheduleStats scheduleStats = new ScheduleStats(); + private String mapNodeLabelExpression; + + private String reduceNodeLabelExpression; + public RMContainerAllocator(ClientService clientService, AppContext context, AMPreemptionPolicy preemptionPolicy) { super(clientService, context); @@ -210,6 +214,8 @@ public class RMContainerAllocator extends RMContainerRequestor RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); + mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP); + reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP); // Init startTime to current time. If all goes well, it will be reset after // first attempt to contact RM. retrystartTime = System.currentTimeMillis(); @@ -396,9 +402,11 @@ public class RMContainerAllocator extends RMContainerRequestor reduceResourceRequest.getVirtualCores()); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast - pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); + pendingReduces.addFirst(new ContainerRequest(reqEvent, + PRIORITY_REDUCE, reduceNodeLabelExpression)); } else { - pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); + pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE, + reduceNodeLabelExpression)); //reduces are added to pending and are slowly ramped up } } @@ -951,7 +959,9 @@ public class RMContainerAllocator extends RMContainerRequestor if (event.getEarlierAttemptFailed()) { earlierFailedMaps.add(event.getAttemptID()); - request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP); + request = + new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP, + mapNodeLabelExpression); LOG.info("Added "+event.getAttemptID()+" to list of failed maps"); } else { for (String host : event.getHosts()) { @@ -976,7 +986,8 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.debug("Added attempt req to rack " + rack); } } - request = new ContainerRequest(event, PRIORITY_MAP); + request = + new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); } maps.put(event.getAttemptID(), request); addContainerReq(request); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 166686485f6..155711fe07b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -121,39 +121,43 @@ public abstract class RMContainerRequestor extends RMCommunicator { final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; + final String nodeLabelExpression; + /** * the time when this request object was formed; can be used to avoid * aggressive preemption for recently placed requests */ final long requestTimeMs; - public ContainerRequest(ContainerRequestEvent event, Priority priority) { + public ContainerRequest(ContainerRequestEvent event, Priority priority, + String nodeLabelExpression) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority); + event.getRacks(), priority, nodeLabelExpression); } public ContainerRequest(ContainerRequestEvent event, Priority priority, long requestTimeMs) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority, requestTimeMs); + event.getRacks(), priority, requestTimeMs,null); } public ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, - Priority priority) { + Priority priority, String nodeLabelExpression) { this(attemptID, capability, hosts, racks, priority, - System.currentTimeMillis()); + System.currentTimeMillis(), nodeLabelExpression); } public ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, - Priority priority, long requestTimeMs) { + Priority priority, long requestTimeMs,String nodeLabelExpression) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; this.racks = racks; this.priority = priority; this.requestTimeMs = requestTimeMs; + this.nodeLabelExpression = nodeLabelExpression; } public String toString() { @@ -390,17 +394,20 @@ public abstract class RMContainerRequestor extends RMCommunicator { for (String host : req.hosts) { // Data-local if (!isNodeBlacklisted(host)) { - addResourceRequest(req.priority, host, req.capability); - } + addResourceRequest(req.priority, host, req.capability, + null); + } } // Nothing Rack-local for now for (String rack : req.racks) { - addResourceRequest(req.priority, rack, req.capability); + addResourceRequest(req.priority, rack, req.capability, + null); } // Off-switch - addResourceRequest(req.priority, ResourceRequest.ANY, req.capability); + addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, + req.nodeLabelExpression); } protected void decContainerReq(ContainerRequest req) { @@ -417,7 +424,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { } private void addResourceRequest(Priority priority, String resourceName, - Resource capability) { + Resource capability, String nodeLabelExpression) { Map> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -439,6 +446,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { remoteRequest.setResourceName(resourceName); remoteRequest.setCapability(capability); remoteRequest.setNumContainers(0); + remoteRequest.setNodeLabelExpression(nodeLabelExpression); reqMap.put(capability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); @@ -533,7 +541,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { } String[] hosts = newHosts.toArray(new String[newHosts.size()]); ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability, - hosts, orig.racks, orig.priority); + hosts, orig.racks, orig.priority, orig.nodeLabelExpression); return newReq; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 3b7a4323238..a096a68fd22 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -491,7 +492,7 @@ public class TestRMContainerAllocator { ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null)); + new RMContainerRequestor.ContainerRequest(event1, null,null)); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -561,6 +562,91 @@ public class TestRMContainerAllocator { assignedRequests.preemptionWaitingReduces.size()); } + @Test + public void testMapReduceAllocationWithNodeLabelExpression() throws Exception { + + LOG.info("Running testMapReduceAllocationWithNodeLabelExpression"); + Configuration conf = new Configuration(); + /* + * final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1; + * conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); + * conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); + */ + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f); + conf.set(MRJobConfig.MAP_NODE_LABEL_EXP, "MapNodes"); + conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes"); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + MyContainerAllocator allocator = + new MyContainerAllocator(null, conf, appAttemptId, mockJob) { + @Override + protected void register() { + } + + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + }; + + // create some map requests + ContainerRequestEvent reqMapEvents; + reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" }); + allocator.sendRequests(Arrays.asList(reqMapEvents)); + + // create some reduce requests + ContainerRequestEvent reqReduceEvents; + reqReduceEvents = + createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true); + allocator.sendRequests(Arrays.asList(reqReduceEvents)); + allocator.schedule(); + // verify all of the host-specific asks were sent plus one for the + // default rack and one for the ANY request + Assert.assertEquals(3, mockScheduler.lastAsk.size()); + // verify ResourceRequest sent for MAP have appropriate node + // label expression as per the configuration + validateLabelsRequests(mockScheduler.lastAsk.get(0), false); + validateLabelsRequests(mockScheduler.lastAsk.get(1), false); + validateLabelsRequests(mockScheduler.lastAsk.get(2), false); + + // assign a map task and verify we do not ask for any more maps + ContainerId cid0 = mockScheduler.assignContainer("map", false); + allocator.schedule(); + // default rack and one for the ANY request + Assert.assertEquals(3, mockScheduler.lastAsk.size()); + validateLabelsRequests(mockScheduler.lastAsk.get(0), true); + validateLabelsRequests(mockScheduler.lastAsk.get(1), true); + validateLabelsRequests(mockScheduler.lastAsk.get(2), true); + + // complete the map task and verify that we ask for one more + allocator.close(); + } + + private void validateLabelsRequests(ResourceRequest resourceRequest, + boolean isReduce) { + switch (resourceRequest.getResourceName()) { + case "map": + case "reduce": + case NetworkTopology.DEFAULT_RACK: + Assert.assertNull(resourceRequest.getNodeLabelExpression()); + break; + case "*": + Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes", + resourceRequest.getNodeLabelExpression()); + break; + default: + Assert.fail("Invalid resource location " + + resourceRequest.getResourceName()); + } + } + @Test public void testMapReduceScheduling() throws Exception { @@ -1498,6 +1584,7 @@ public class TestRMContainerAllocator { .getNumContainers(), req.getRelaxLocality()); askCopy.add(reqCopy); } + SecurityUtil.setTokenServiceUseIp(false); lastAsk = ask; lastRelease = release; lastBlacklistAdditions = blacklistAdditions; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index fb4064c2270..59b887dbf33 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -71,6 +71,26 @@ public interface MRJobConfig { public static final String QUEUE_NAME = "mapreduce.job.queuename"; + /** + * Node Label expression applicable for all Job containers. + */ + public static final String JOB_NODE_LABEL_EXP = "mapreduce.job.node-label-expression"; + + /** + * Node Label expression applicable for AM containers. + */ + public static final String AM_NODE_LABEL_EXP = "mapreduce.job.am.node-label-expression"; + + /** + * Node Label expression applicable for map containers. + */ + public static final String MAP_NODE_LABEL_EXP = "mapreduce.map.node-label-expression"; + + /** + * Node Label expression applicable for reduce containers. + */ + public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression"; + public static final String RESERVATION_ID = "mapreduce.job.reservation.id"; public static final String JOB_TAGS = "mapreduce.job.tags"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 5daf66df078..cd7c9b2fed9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1112,6 +1112,41 @@ + + mapreduce.job.node-label-expression + All the containers of the Map Reduce job will be run with this + node label expression. If the node-label-expression for job is not set, then + it will use queue's default-node-label-expression for all job's containers. + + + + + mapreduce.job.am.node-label-expression + This is node-label configuration for Map Reduce Application Master + container. If not configured it will make use of + mapreduce.job.node-label-expression and if job's node-label expression is not + configured then it will use queue's default-node-label-expression. + + + + + mapreduce.map.node-label-expression + This is node-label configuration for Map task containers. If not + configured it will use mapreduce.job.node-label-expression and if job's + node-label expression is not configured then it will use queue's + default-node-label-expression. + + + + + mapreduce.reduce.node-label-expression + This is node-label configuration for Reduce task containers. If + not configured it will use mapreduce.job.node-label-expression and if job's + node-label expression is not configured then it will use queue's + default-node-label-expression. + + + mapreduce.job.counters.limit 120 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 8e576076125..2bb2483f1df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -76,8 +76,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -97,7 +99,15 @@ public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private final static RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + public final static Priority AM_CONTAINER_PRIORITY = recordFactory + .newRecordInstance(Priority.class); + static { + AM_CONTAINER_PRIORITY.setPriority(0); + } + private ResourceMgrDelegate resMgrDelegate; private ClientCache clientCache; private Configuration conf; @@ -525,6 +535,24 @@ public class YARNRunner implements ClientProtocol { conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); appContext.setResource(capability); + + // set labels for the AM container request if present + String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP); + if (null != amNodelabelExpression + && amNodelabelExpression.trim().length() != 0) { + ResourceRequest amResourceRequest = + recordFactory.newRecordInstance(ResourceRequest.class); + amResourceRequest.setPriority(AM_CONTAINER_PRIORITY); + amResourceRequest.setResourceName(ResourceRequest.ANY); + amResourceRequest.setCapability(capability); + amResourceRequest.setNumContainers(1); + amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim()); + appContext.setAMContainerResourceRequest(amResourceRequest); + } + // set labels for the Job containers + appContext.setNodeLabelExpression(jobConf + .get(JobContext.JOB_NODE_LABEL_EXP)); + appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE); if (tagsFromConf != null && !tagsFromConf.isEmpty()) { appContext.setApplicationTags(new HashSet(tagsFromConf)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index c427975e353..0e53ab0dbdf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -546,6 +546,22 @@ public class TestYARNRunner extends TestCase { throw new IllegalStateException("Profiler opts not found!"); } + @Test + public void testNodeLabelExp() throws Exception { + JobConf jobConf = new JobConf(); + + jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU"); + jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem"); + + YARNRunner yarnRunner = new YARNRunner(jobConf); + ApplicationSubmissionContext appSubCtx = + buildSubmitContext(yarnRunner, jobConf); + + assertEquals(appSubCtx.getNodeLabelExpression(), "GPU"); + assertEquals(appSubCtx.getAMContainerResourceRequest() + .getNodeLabelExpression(), "highMem"); + } + @Test public void testAMStandardEnv() throws Exception { final String ADMIN_LIB_PATH = "foo";