MAPREDUCE-6697. Concurrent task limits should only be applied when necessary. Contributed by Nathan Roberts.

(cherry picked from commit a5c0476a99)
This commit is contained in:
Akira Ajisaka 2017-06-28 10:50:09 +09:00
parent ce57458ec5
commit 2a5a313539
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
2 changed files with 73 additions and 6 deletions

View File

@ -889,7 +889,8 @@ public class RMContainerAllocator extends RMContainerRequestor
private void applyConcurrentTaskLimits() { private void applyConcurrentTaskLimits() {
int numScheduledMaps = scheduledRequests.maps.size(); int numScheduledMaps = scheduledRequests.maps.size();
if (maxRunningMaps > 0 && numScheduledMaps > 0) { if (maxRunningMaps > 0 && numScheduledMaps > 0 &&
getJob().getTotalMaps() > maxRunningMaps) {
int maxRequestedMaps = Math.max(0, int maxRequestedMaps = Math.max(0,
maxRunningMaps - assignedRequests.maps.size()); maxRunningMaps - assignedRequests.maps.size());
int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size(); int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
@ -906,7 +907,8 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
int numScheduledReduces = scheduledRequests.reduces.size(); int numScheduledReduces = scheduledRequests.reduces.size();
if (maxRunningReduces > 0 && numScheduledReduces > 0) { if (maxRunningReduces > 0 && numScheduledReduces > 0 &&
getJob().getTotalReduces() > maxRunningReduces) {
int maxRequestedReduces = Math.max(0, int maxRequestedReduces = Math.max(0,
maxRunningReduces - assignedRequests.reduces.size()); maxRunningReduces - assignedRequests.reduces.size());
int reduceRequestLimit = Math.min(maxRequestedReduces, int reduceRequestLimit = Math.min(maxRequestedReduces,

View File

@ -2775,15 +2775,78 @@ public class TestRMContainerAllocator {
new Text(rmAddr), ugiToken.getService()); new Text(rmAddr), ugiToken.getService());
} }
@Test
public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception {
final int MAP_COUNT = 1;
final int REDUCE_COUNT = 1;
final int MAP_LIMIT = 1;
final int REDUCE_LIMIT = 1;
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator =
new MyContainerAllocator(null, conf, appAttemptId, mockJob,
SystemClock.getInstance()) {
@Override
protected void register() {
}
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
return mockScheduler;
}
@Override
protected void setRequestLimit(Priority priority,
Resource capability, int limit) {
Assert.fail("setRequestLimit() should not be invoked");
}
};
// create some map requests
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
for (int i = 0; i < reqMapEvents.length; ++i) {
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
}
allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests
ContainerRequestEvent[] reqReduceEvents =
new ContainerRequestEvent[REDUCE_COUNT];
for (int i = 0; i < reqReduceEvents.length; ++i) {
reqReduceEvents[i] =
createReq(jobId, i, 1024, new String[] {}, false, true);
}
allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule();
allocator.schedule();
allocator.schedule();
allocator.close();
}
@Test @Test
public void testConcurrentTaskLimits() throws Exception { public void testConcurrentTaskLimits() throws Exception {
final int MAP_COUNT = 5;
final int REDUCE_COUNT = 2;
final int MAP_LIMIT = 3; final int MAP_LIMIT = 3;
final int REDUCE_LIMIT = 1; final int REDUCE_LIMIT = 1;
LOG.info("Running testConcurrentTaskLimits"); LOG.info("Running testConcurrentTaskLimits");
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f); conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 1); appId, 1);
@ -2792,6 +2855,9 @@ public class TestRMContainerAllocator {
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
final MockScheduler mockScheduler = new MockScheduler(appAttemptId); final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = new MyContainerAllocator(null, conf, MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
appAttemptId, mockJob, SystemClock.getInstance()) { appAttemptId, mockJob, SystemClock.getInstance()) {
@ -2806,14 +2872,13 @@ public class TestRMContainerAllocator {
}; };
// create some map requests // create some map requests
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5]; ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
for (int i = 0; i < reqMapEvents.length; ++i) { for (int i = 0; i < reqMapEvents.length; ++i) {
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
} }
allocator.sendRequests(Arrays.asList(reqMapEvents)); allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests // create some reduce requests
ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2]; ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT];
for (int i = 0; i < reqReduceEvents.length; ++i) { for (int i = 0; i < reqReduceEvents.length; ++i) {
reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
false, true); false, true);