MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down. Contributed by Gera Shegalov
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1586479 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cde7b17566
commit
26421dd7dc
|
@ -160,6 +160,9 @@ Release 2.5.0 - UNRELEASED
|
|||
MAPREDUCE-5804. TestMRJobsWithProfiler#testProfiler timesout (Mit Desai
|
||||
via kihwal)
|
||||
|
||||
MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down
|
||||
(Gera Shegalov via jlowe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -92,6 +92,10 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
private static final Priority PRIORITY_REDUCE;
|
||||
private static final Priority PRIORITY_MAP;
|
||||
|
||||
@VisibleForTesting
|
||||
public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
|
||||
+ "to make room for pending map attempts";
|
||||
|
||||
private Thread eventHandlingThread;
|
||||
private final AtomicBoolean stopped;
|
||||
|
||||
|
@ -1153,7 +1157,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
TaskAttemptId id = reduceList.remove(0);//remove the one on top
|
||||
LOG.info("Preempting " + id);
|
||||
preemptionWaitingReduces.add(id);
|
||||
eventHandler.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_KILL));
|
||||
eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -362,6 +362,59 @@ public class TestRMContainerAllocator {
|
|||
assigned, false);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testReducerRampdownDiagnostics() throws Exception {
|
||||
LOG.info("Running tesReducerRampdownDiagnostics");
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
|
||||
final MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
|
||||
.getDispatcher();
|
||||
final RMApp app = rm.submitApp(1024);
|
||||
dispatcher.await();
|
||||
|
||||
final String host = "host1";
|
||||
final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048);
|
||||
nm.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||
.getAppAttemptId();
|
||||
rm.sendAMLaunched(appAttemptId);
|
||||
dispatcher.await();
|
||||
final JobId jobId = MRBuilderUtils
|
||||
.newJobId(appAttemptId.getApplicationId(), 0);
|
||||
final Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||
appAttemptId, mockJob);
|
||||
// add resources to scheduler
|
||||
dispatcher.await();
|
||||
|
||||
// create the container request
|
||||
final String[] locations = new String[] { host };
|
||||
allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
|
||||
for (int i = 0; i < 1;) {
|
||||
dispatcher.await();
|
||||
i += allocator.schedule().size();
|
||||
nm.nodeHeartbeat(true);
|
||||
}
|
||||
|
||||
allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
|
||||
while (allocator.getTaskAttemptKillEvents().size() == 0) {
|
||||
dispatcher.await();
|
||||
allocator.schedule().size();
|
||||
nm.nodeHeartbeat(true);
|
||||
}
|
||||
final String killEventMessage = allocator.getTaskAttemptKillEvents().get(0)
|
||||
.getMessage();
|
||||
Assert.assertTrue("No reducer rampDown preemption message",
|
||||
killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapReduceScheduling() throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue