svn merge -c 1586479 FIXES: MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down. Contributed by Gera Shegalov

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1586482 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-04-10 22:09:16 +00:00
parent a495fdb86f
commit 58aefafe76
3 changed files with 61 additions and 1 deletions

View File

@ -24,6 +24,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5804. TestMRJobsWithProfiler#testProfiler timesout (Mit Desai
via kihwal)
MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down
(Gera Shegalov via jlowe)
OPTIMIZATIONS
BUG FIXES

View File

@ -90,6 +90,10 @@ public class RMContainerAllocator extends RMContainerRequestor
private static final Priority PRIORITY_REDUCE;
private static final Priority PRIORITY_MAP;
@VisibleForTesting
public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
+ "to make room for pending map attempts";
private Thread eventHandlingThread;
private final AtomicBoolean stopped;
@ -1133,7 +1137,7 @@ public int compare(TaskAttemptId o1, TaskAttemptId o2) {
TaskAttemptId id = reduceList.remove(0);//remove the one on top
LOG.info("Preempting " + id);
preemptionWaitingReduces.add(id);
eventHandler.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_KILL));
eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC));
}
}

View File

@ -360,6 +360,59 @@ public void testResource() throws Exception {
assigned, false);
}
@Test(timeout = 30000)
public void testReducerRampdownDiagnostics() throws Exception {
LOG.info("Running tesReducerRampdownDiagnostics");
final Configuration conf = new Configuration();
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
final RMApp app = rm.submitApp(1024);
dispatcher.await();
final String host = "host1";
final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048);
nm.nodeHeartbeat(true);
dispatcher.await();
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
final JobId jobId = MRBuilderUtils
.newJobId(appAttemptId.getApplicationId(), 0);
final Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// add resources to scheduler
dispatcher.await();
// create the container request
final String[] locations = new String[] { host };
allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
for (int i = 0; i < 1;) {
dispatcher.await();
i += allocator.schedule().size();
nm.nodeHeartbeat(true);
}
allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
while (allocator.getTaskAttemptKillEvents().size() == 0) {
dispatcher.await();
allocator.schedule().size();
nm.nodeHeartbeat(true);
}
final String killEventMessage = allocator.getTaskAttemptKillEvents().get(0)
.getMessage();
Assert.assertTrue("No reducer rampDown preemption message",
killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC));
}
@Test
public void testMapReduceScheduling() throws Exception {