From d87463514ad04940f9fdd83aee463d5dea5331ec Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Tue, 22 Apr 2014 18:22:26 +0000 Subject: [PATCH] MAPREDUCE-5827. Merging change r1589223 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589236 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/TestSpeculativeExecutionWithMRApp.java | 78 +++++++++---------- 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1720ad86244..a44323d057d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -49,6 +49,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5642. TestMiniMRChildTask fails on Windows. (Chuan Liu via cnauroth) + MAPREDUCE-5827. TestSpeculativeExecutionWithMRApp fails. + (Zhijie Shen via cnauroth) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index 7179a2bdfe0..601904b28a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -40,22 +40,26 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Test; +import com.google.common.base.Supplier; + @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestSpeculativeExecutionWithMRApp { private static final int NUM_MAPPERS = 5; private static final int NUM_REDUCERS = 0; - @Test(timeout = 60000) + @Test public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { Clock actualClock = new SystemClock(); - ControlledClock clock = new ControlledClock(actualClock); + final ControlledClock clock = new ControlledClock(actualClock); clock.setTime(System.currentTimeMillis()); MRApp app = @@ -88,7 +92,7 @@ public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { Random generator = new Random(); Object[] taskValues = tasks.values().toArray(); - Task taskToBeSpeculated = + final Task taskToBeSpeculated = (Task) taskValues[generator.nextInt(taskValues.length)]; // Other than one random task, finish every other task. @@ -105,30 +109,28 @@ public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { } } - int maxTimeWait = 10; - boolean successfullySpeculated = false; - TaskAttempt[] ta = null; - while (maxTimeWait > 0 && !successfullySpeculated) { - if (taskToBeSpeculated.getAttempts().size() != 2) { - Thread.sleep(1000); - clock.setTime(System.currentTimeMillis() + 20000); - } else { - successfullySpeculated = true; - // finish 1st TA, 2nd will be killed - ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + if (taskToBeSpeculated.getAttempts().size() != 2) { + clock.setTime(System.currentTimeMillis() + 1000); + return false; + } else { + return true; + } } - maxTimeWait--; - } - Assert - .assertTrue("Couldn't speculate successfully", successfullySpeculated); + }, 1000, 60000); + // finish 1st TA, 2nd will be killed + TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated); verifySpeculationMessage(app, ta); + app.waitForState(Service.STATE.STOPPED); } - @Test(timeout = 60000) + @Test public void testSepculateSuccessfulWithUpdateEvents() throws Exception { Clock actualClock = new SystemClock(); - ControlledClock clock = new ControlledClock(actualClock); + final ControlledClock clock = new ControlledClock(actualClock); clock.setTime(System.currentTimeMillis()); MRApp app = @@ -200,21 +202,21 @@ public void testSepculateSuccessfulWithUpdateEvents() throws Exception { } } - int maxTimeWait = 5; - boolean successfullySpeculated = false; - TaskAttempt[] ta = null; - while (maxTimeWait > 0 && !successfullySpeculated) { - if (speculatedTask.getAttempts().size() != 2) { - Thread.sleep(1000); - } else { - successfullySpeculated = true; - ta = makeFirstAttemptWin(appEventHandler, speculatedTask); + final Task speculatedTaskConst = speculatedTask; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + if (speculatedTaskConst.getAttempts().size() != 2) { + clock.setTime(System.currentTimeMillis() + 1000); + return false; + } else { + return true; + } } - maxTimeWait--; - } - Assert - .assertTrue("Couldn't speculate successfully", successfullySpeculated); + }, 1000, 60000); + TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask); verifySpeculationMessage(app, ta); + app.waitForState(Service.STATE.STOPPED); } private static TaskAttempt[] makeFirstAttemptWin( @@ -234,15 +236,7 @@ private static TaskAttempt[] makeFirstAttemptWin( private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta) throws Exception { app.waitForState(ta[0], TaskAttemptState.SUCCEEDED); - app.waitForState(ta[1], TaskAttemptState.KILLED); - boolean foundSpecMsg = false; - for (String msg : ta[1].getDiagnostics()) { - if (msg.contains("Speculation")) { - foundSpecMsg = true; - break; - } - } - Assert.assertTrue("No speculation diagnostics!", foundSpecMsg); + // The speculative attempt may be not killed before the MR job succeeds. } private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,