From 58aefafe76570daa020c2cef68e8eab24172a20c Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Thu, 10 Apr 2014 22:09:16 +0000 Subject: [PATCH] svn merge -c 1586479 FIXES: MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down. Contributed by Gera Shegalov git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1586482 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../v2/app/rm/RMContainerAllocator.java | 6 ++- .../v2/app/TestRMContainerAllocator.java | 53 +++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e601da9d7b3..88e14eca987 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -24,6 +24,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5804. TestMRJobsWithProfiler#testProfiler timesout (Mit Desai via kihwal) + MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down + (Gera Shegalov via jlowe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 17cd19a2210..fc56700b247 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -90,6 +90,10 @@ public class RMContainerAllocator extends RMContainerRequestor private static final Priority PRIORITY_REDUCE; private static final Priority PRIORITY_MAP; + @VisibleForTesting + public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted " + + "to make room for pending map attempts"; + private Thread eventHandlingThread; private final AtomicBoolean stopped; @@ -1133,7 +1137,7 @@ public class RMContainerAllocator extends RMContainerRequestor TaskAttemptId id = reduceList.remove(0);//remove the one on top LOG.info("Preempting " + id); preemptionWaitingReduces.add(id); - eventHandler.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_KILL)); + eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC)); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 8f3d5f8a449..04256e99c9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -360,6 +360,59 @@ public class TestRMContainerAllocator { assigned, false); } + @Test(timeout = 30000) + public void testReducerRampdownDiagnostics() throws Exception { + LOG.info("Running tesReducerRampdownDiagnostics"); + + final Configuration conf = new Configuration(); + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); + final MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + final RMApp app = rm.submitApp(1024); + dispatcher.await(); + + final String host = "host1"; + final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048); + nm.nodeHeartbeat(true); + dispatcher.await(); + final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + final JobId jobId = MRBuilderUtils + .newJobId(appAttemptId.getApplicationId(), 0); + final Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + // add resources to scheduler + dispatcher.await(); + + // create the container request + final String[] locations = new String[] { host }; + allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + for (int i = 0; i < 1;) { + dispatcher.await(); + i += allocator.schedule().size(); + nm.nodeHeartbeat(true); + } + + allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); + while (allocator.getTaskAttemptKillEvents().size() == 0) { + dispatcher.await(); + allocator.schedule().size(); + nm.nodeHeartbeat(true); + } + final String killEventMessage = allocator.getTaskAttemptKillEvents().get(0) + .getMessage(); + Assert.assertTrue("No reducer rampDown preemption message", + killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC)); + } + @Test public void testMapReduceScheduling() throws Exception {