MAPREDUCE-6304. Specifying node labels when submitting MR jobs. (Naganarasimha G R via wangda)

(cherry picked from commit 3164e7d838)

Conflicts:
	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
This commit is contained in:
Wangda Tan 2015-05-27 14:26:03 -07:00
parent 6ed8989a6f
commit 7cd0874dfd
8 changed files with 226 additions and 18 deletions

View File

@ -12,6 +12,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6364. Add a "Kill" link to Task Attempts page. (Ryu Kobayashi
via ozawa)
MAPREDUCE-6304. Specifying node labels when submitting MR jobs.
(Naganarasimha G R via wangda)
IMPROVEMENTS
MAPREDUCE-6291. Correct mapred queue usage command.

View File

@ -176,6 +176,10 @@ public class RMContainerAllocator extends RMContainerRequestor
private ScheduleStats scheduleStats = new ScheduleStats();
private String mapNodeLabelExpression;
private String reduceNodeLabelExpression;
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
this.stopped = new AtomicBoolean(false);
@ -204,6 +208,8 @@ public class RMContainerAllocator extends RMContainerRequestor
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP);
reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP);
// Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM.
retrystartTime = System.currentTimeMillis();
@ -390,9 +396,11 @@ public class RMContainerAllocator extends RMContainerRequestor
reduceResourceRequest.getVirtualCores());
if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
pendingReduces.addFirst(new ContainerRequest(reqEvent,
PRIORITY_REDUCE, reduceNodeLabelExpression));
} else {
pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE,
reduceNodeLabelExpression));
//reduces are added to pending and are slowly ramped up
}
}
@ -931,7 +939,9 @@ public class RMContainerAllocator extends RMContainerRequestor
if (event.getEarlierAttemptFailed()) {
earlierFailedMaps.add(event.getAttemptID());
request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
request =
new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP,
mapNodeLabelExpression);
LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
} else {
for (String host : event.getHosts()) {
@ -956,7 +966,8 @@ public class RMContainerAllocator extends RMContainerRequestor
LOG.debug("Added attempt req to rack " + rack);
}
}
request = new ContainerRequest(event, PRIORITY_MAP);
request =
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
}
maps.put(event.getAttemptID(), request);
addContainerReq(request);

View File

@ -121,39 +121,43 @@ public abstract class RMContainerRequestor extends RMCommunicator {
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
final String nodeLabelExpression;
/**
* the time when this request object was formed; can be used to avoid
* aggressive preemption for recently placed requests
*/
final long requestTimeMs;
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
public ContainerRequest(ContainerRequestEvent event, Priority priority,
String nodeLabelExpression) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority);
event.getRacks(), priority, nodeLabelExpression);
}
public ContainerRequest(ContainerRequestEvent event, Priority priority,
long requestTimeMs) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority, requestTimeMs);
event.getRacks(), priority, requestTimeMs,null);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority) {
Priority priority, String nodeLabelExpression) {
this(attemptID, capability, hosts, racks, priority,
System.currentTimeMillis());
System.currentTimeMillis(), nodeLabelExpression);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority, long requestTimeMs) {
Priority priority, long requestTimeMs,String nodeLabelExpression) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority;
this.requestTimeMs = requestTimeMs;
this.nodeLabelExpression = nodeLabelExpression;
}
public String toString() {
@ -390,17 +394,20 @@ public abstract class RMContainerRequestor extends RMCommunicator {
for (String host : req.hosts) {
// Data-local
if (!isNodeBlacklisted(host)) {
addResourceRequest(req.priority, host, req.capability);
addResourceRequest(req.priority, host, req.capability,
null);
}
}
// Nothing Rack-local for now
for (String rack : req.racks) {
addResourceRequest(req.priority, rack, req.capability);
addResourceRequest(req.priority, rack, req.capability,
null);
}
// Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.nodeLabelExpression);
}
protected void decContainerReq(ContainerRequest req) {
@ -417,7 +424,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
}
private void addResourceRequest(Priority priority, String resourceName,
Resource capability) {
Resource capability, String nodeLabelExpression) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
@ -439,6 +446,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
remoteRequest.setResourceName(resourceName);
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
remoteRequest.setNodeLabelExpression(nodeLabelExpression);
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
@ -533,7 +541,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
}
String[] hosts = newHosts.toArray(new String[newHosts.size()]);
ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
hosts, orig.racks, orig.priority);
hosts, orig.racks, orig.priority, orig.nodeLabelExpression);
return newReq;
}

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -490,7 +491,7 @@ public class TestRMContainerAllocator {
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null));
new RMContainerRequestor.ContainerRequest(event1, null,null));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
@ -560,6 +561,91 @@ public class TestRMContainerAllocator {
assignedRequests.preemptionWaitingReduces.size());
}
@Test
public void testMapReduceAllocationWithNodeLabelExpression() throws Exception {
LOG.info("Running testMapReduceAllocationWithNodeLabelExpression");
Configuration conf = new Configuration();
/*
* final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1;
* conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
* conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
*/
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
conf.set(MRJobConfig.MAP_NODE_LABEL_EXP, "MapNodes");
conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes");
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator =
new MyContainerAllocator(null, conf, appAttemptId, mockJob) {
@Override
protected void register() {
}
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
return mockScheduler;
}
};
// create some map requests
ContainerRequestEvent reqMapEvents;
reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" });
allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests
ContainerRequestEvent reqReduceEvents;
reqReduceEvents =
createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true);
allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule();
// verify all of the host-specific asks were sent plus one for the
// default rack and one for the ANY request
Assert.assertEquals(3, mockScheduler.lastAsk.size());
// verify ResourceRequest sent for MAP have appropriate node
// label expression as per the configuration
validateLabelsRequests(mockScheduler.lastAsk.get(0), false);
validateLabelsRequests(mockScheduler.lastAsk.get(1), false);
validateLabelsRequests(mockScheduler.lastAsk.get(2), false);
// assign a map task and verify we do not ask for any more maps
ContainerId cid0 = mockScheduler.assignContainer("map", false);
allocator.schedule();
// default rack and one for the ANY request
Assert.assertEquals(3, mockScheduler.lastAsk.size());
validateLabelsRequests(mockScheduler.lastAsk.get(0), true);
validateLabelsRequests(mockScheduler.lastAsk.get(1), true);
validateLabelsRequests(mockScheduler.lastAsk.get(2), true);
// complete the map task and verify that we ask for one more
allocator.close();
}
private void validateLabelsRequests(ResourceRequest resourceRequest,
boolean isReduce) {
switch (resourceRequest.getResourceName()) {
case "map":
case "reduce":
case NetworkTopology.DEFAULT_RACK:
Assert.assertNull(resourceRequest.getNodeLabelExpression());
break;
case "*":
Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes",
resourceRequest.getNodeLabelExpression());
break;
default:
Assert.fail("Invalid resource location "
+ resourceRequest.getResourceName());
}
}
@Test
public void testMapReduceScheduling() throws Exception {
@ -1497,6 +1583,7 @@ public class TestRMContainerAllocator {
.getNumContainers(), req.getRelaxLocality());
askCopy.add(reqCopy);
}
SecurityUtil.setTokenServiceUseIp(false);
lastAsk = ask;
lastRelease = release;
lastBlacklistAdditions = blacklistAdditions;

View File

@ -68,6 +68,26 @@ public interface MRJobConfig {
public static final String QUEUE_NAME = "mapreduce.job.queuename";
/**
* Node Label expression applicable for all Job containers.
*/
public static final String JOB_NODE_LABEL_EXP = "mapreduce.job.node-label-expression";
/**
* Node Label expression applicable for AM containers.
*/
public static final String AM_NODE_LABEL_EXP = "mapreduce.job.am.node-label-expression";
/**
* Node Label expression applicable for map containers.
*/
public static final String MAP_NODE_LABEL_EXP = "mapreduce.map.node-label-expression";
/**
* Node Label expression applicable for reduce containers.
*/
public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression";
public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
public static final String JOB_TAGS = "mapreduce.job.tags";

View File

@ -1566,6 +1566,41 @@
<!-- MR YARN Application properties -->
<property>
<name>mapreduce.job.node-label-expression</name>
<description>All the containers of the Map Reduce job will be run with this
node label expression. If the node-label-expression for job is not set, then
it will use queue's default-node-label-expression for all job's containers.
</description>
</property>
<property>
<name>mapreduce.job.am.node-label-expression</name>
<description>This is node-label configuration for Map Reduce Application Master
container. If not configured it will make use of
mapreduce.job.node-label-expression and if job's node-label expression is not
configured then it will use queue's default-node-label-expression.
</description>
</property>
<property>
<name>mapreduce.map.node-label-expression</name>
<description>This is node-label configuration for Map task containers. If not
configured it will use mapreduce.job.node-label-expression and if job's
node-label expression is not configured then it will use queue's
default-node-label-expression.
</description>
</property>
<property>
<name>mapreduce.reduce.node-label-expression</name>
<description>This is node-label configuration for Reduce task containers. If
not configured it will use mapreduce.job.node-label-expression and if job's
node-label expression is not configured then it will use queue's
default-node-label-expression.
</description>
</property>
<property>
<name>mapreduce.job.counters.limit</name>
<value>120</value>

View File

@ -76,8 +76,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -97,7 +99,15 @@ public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
public final static Priority AM_CONTAINER_PRIORITY = recordFactory
.newRecordInstance(Priority.class);
static {
AM_CONTAINER_PRIORITY.setPriority(0);
}
private ResourceMgrDelegate resMgrDelegate;
private ClientCache clientCache;
private Configuration conf;
@ -525,6 +535,24 @@ public class YARNRunner implements ClientProtocol {
conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
appContext.setResource(capability);
// set labels for the AM container request if present
String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
if (null != amNodelabelExpression
&& amNodelabelExpression.trim().length() != 0) {
ResourceRequest amResourceRequest =
recordFactory.newRecordInstance(ResourceRequest.class);
amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
amResourceRequest.setResourceName(ResourceRequest.ANY);
amResourceRequest.setCapability(capability);
amResourceRequest.setNumContainers(1);
amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
appContext.setAMContainerResourceRequest(amResourceRequest);
}
// set labels for the Job containers
appContext.setNodeLabelExpression(jobConf
.get(JobContext.JOB_NODE_LABEL_EXP));
appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));

View File

@ -546,6 +546,22 @@ public class TestYARNRunner extends TestCase {
throw new IllegalStateException("Profiler opts not found!");
}
@Test
public void testNodeLabelExp() throws Exception {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU");
jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem");
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
assertEquals(appSubCtx.getAMContainerResourceRequest()
.getNodeLabelExpression(), "highMem");
}
@Test
public void testAMStandardEnv() throws Exception {
final String ADMIN_LIB_PATH = "foo";