MAPREDUCE-6765. MR should not schedule container requests in cases where reducer or mapper containers demand resource larger than the maximum supported (haibochen via rkanter)

This commit is contained in:
Robert Kanter 2016-11-01 20:47:25 -07:00
parent 1b6ecaf016
commit fc2b69eba1
2 changed files with 195 additions and 72 deletions

View File

@ -370,76 +370,16 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
@SuppressWarnings({ "unchecked" })
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
recalculateReduceSchedule = true;
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
JobId jobId = getJob().getID();
Resource supportedMaxContainerCapability = getMaxContainerCapability();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
if (mapResourceRequest.equals(Resources.none())) {
mapResourceRequest = reqEvent.getCapability();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
.getMemorySize())));
LOG.info("mapResourceRequest:" + mapResourceRequest);
if (mapResourceRequest.getMemorySize() > supportedMaxContainerCapability
.getMemorySize()
|| mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability
.getVirtualCores()) {
String diagMsg =
"MAP capability required is more than the supported "
+ "max container capability in the cluster. Killing the Job. mapResourceRequest: "
+ mapResourceRequest + " maxContainerCapability:"
+ supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
// set the resources
reqEvent.getCapability().setMemorySize(mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
boolean isMap = reqEvent.getAttemptID().getTaskId().getTaskType().
equals(TaskType.MAP);
if (isMap) {
handleMapContainerRequest(reqEvent);
} else {
if (reduceResourceRequest.equals(Resources.none())) {
reduceResourceRequest = reqEvent.getCapability();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceRequest.getMemorySize())));
LOG.info("reduceResourceRequest:" + reduceResourceRequest);
if (reduceResourceRequest.getMemorySize() > supportedMaxContainerCapability
.getMemorySize()
|| reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability
.getVirtualCores()) {
String diagMsg =
"REDUCE capability required is more than the "
+ "supported max container capability in the cluster. Killing the "
+ "Job. reduceResourceRequest: " + reduceResourceRequest
+ " maxContainerCapability:"
+ supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
// set the resources
reqEvent.getCapability().setMemorySize(reduceResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
reduceResourceRequest.getVirtualCores());
if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent,
PRIORITY_REDUCE, reduceNodeLabelExpression));
} else {
pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE,
reduceNodeLabelExpression));
//reduces are added to pending and are slowly ramped up
}
handleReduceContainerRequest(reqEvent);
}
} else if (
@ -476,6 +416,103 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
@SuppressWarnings({ "unchecked" })
private void handleReduceContainerRequest(ContainerRequestEvent reqEvent) {
assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals(
TaskType.REDUCE));
Resource supportedMaxContainerCapability = getMaxContainerCapability();
JobId jobId = getJob().getID();
if (reduceResourceRequest.equals(Resources.none())) {
reduceResourceRequest = reqEvent.getCapability();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceRequest.getMemorySize())));
LOG.info("reduceResourceRequest:" + reduceResourceRequest);
}
boolean reduceContainerRequestAccepted = true;
if (reduceResourceRequest.getMemorySize() >
supportedMaxContainerCapability.getMemorySize()
||
reduceResourceRequest.getVirtualCores() >
supportedMaxContainerCapability.getVirtualCores()) {
reduceContainerRequestAccepted = false;
}
if (reduceContainerRequestAccepted) {
// set the resources
reqEvent.getCapability().setVirtualCores(
reduceResourceRequest.getVirtualCores());
reqEvent.getCapability().setMemorySize(
reduceResourceRequest.getMemorySize());
if (reqEvent.getEarlierAttemptFailed()) {
//previously failed reducers are added to the front for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent,
PRIORITY_REDUCE, reduceNodeLabelExpression));
} else {
//reduces are added to pending queue and are slowly ramped up
pendingReduces.add(new ContainerRequest(reqEvent,
PRIORITY_REDUCE, reduceNodeLabelExpression));
}
} else {
String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing" +
" the Job. reduceResourceRequest: " + reduceResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
@SuppressWarnings({ "unchecked" })
private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals(
TaskType.MAP));
Resource supportedMaxContainerCapability = getMaxContainerCapability();
JobId jobId = getJob().getID();
if (mapResourceRequest.equals(Resources.none())) {
mapResourceRequest = reqEvent.getCapability();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceRequest.getMemorySize())));
LOG.info("mapResourceRequest:" + mapResourceRequest);
}
boolean mapContainerRequestAccepted = true;
if (mapResourceRequest.getMemorySize() >
supportedMaxContainerCapability.getMemorySize()
||
mapResourceRequest.getVirtualCores() >
supportedMaxContainerCapability.getVirtualCores()) {
mapContainerRequestAccepted = false;
}
if(mapContainerRequestAccepted) {
// set the resources
reqEvent.getCapability().setMemorySize(
mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent); //maps are immediately scheduled
} else {
String diagMsg = "The required MAP capability is more than the " +
"supported max container capability in the cluster. Killing" +
" the Job. mapResourceRequest: " + mapResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
private static String getHost(String contMgrAddress) {
String host = contMgrAddress;
String[] hostport = host.split(":");

View File

@ -1792,12 +1792,18 @@ public class TestRMContainerAllocator {
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int memory, String[] hosts) {
return createReq(jobId, taskAttemptId, memory, hosts, false, false);
return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false);
}
private ContainerRequestEvent
createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts,
boolean earlierFailedAttempt, boolean reduce) {
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
return createReq(jobId, taskAttemptId, mem,
1, hosts, earlierFailedAttempt, reduce);
}
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int memory, int vcore, String[] hosts, boolean earlierFailedAttempt,
boolean reduce) {
TaskId taskId;
if (reduce) {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
@ -1806,7 +1812,7 @@ public class TestRMContainerAllocator {
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
Resource containerNeed = Resource.newInstance(memory, 1);
Resource containerNeed = Resource.newInstance(memory, vcore);
if (earlierFailedAttempt) {
return ContainerRequestEvent
.createContainerRequestEventForFailedContainer(attemptId,
@ -2607,6 +2613,86 @@ public class TestRMContainerAllocator {
}
@Test
public void testUnsupportedMapContainerRequirement() throws Exception {
final Resource maxContainerSupported = Resource.newInstance(1, 1);
final ApplicationId appId = ApplicationId.newInstance(1, 1);
final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
final JobId jobId =
MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
final Configuration conf = new Configuration();
final MyContainerAllocator allocator = new MyContainerAllocator(null,
conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) {
@Override
protected void register() {
}
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
return mockScheduler;
}
@Override
protected Resource getMaxContainerCapability() {
return maxContainerSupported;
}
};
ContainerRequestEvent mapRequestEvt = createReq(jobId, 0,
(int) (maxContainerSupported.getMemorySize() + 10),
maxContainerSupported.getVirtualCores(),
new String[0], false, false);
allocator.sendRequests(Arrays.asList(mapRequestEvt));
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
}
@Test
public void testUnsupportedReduceContainerRequirement() throws Exception {
final Resource maxContainerSupported = Resource.newInstance(1, 1);
final ApplicationId appId = ApplicationId.newInstance(1, 1);
final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 1);
final JobId jobId =
MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
final Configuration conf = new Configuration();
final MyContainerAllocator allocator = new MyContainerAllocator(null,
conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) {
@Override
protected void register() {
}
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
return mockScheduler;
}
@Override
protected Resource getMaxContainerCapability() {
return maxContainerSupported;
}
};
ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0,
(int) (maxContainerSupported.getMemorySize() + 10),
maxContainerSupported.getVirtualCores(),
new String[0], false, true);
allocator.sendRequests(Arrays.asList(reduceRequestEvt));
// Reducer container requests are added to the pending queue upon request,
// schedule all reducers here so that we can observe if reducer requests
// are accepted by RMContainerAllocator on RM side.
allocator.scheduleAllReduces();
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
}
@Test
public void testRMUnavailable()
throws Exception {