MAPREDUCE-5827. Merging change r1589223 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589236 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-04-22 18:22:26 +00:00
parent f0d3664d83
commit d87463514a
2 changed files with 39 additions and 42 deletions

View File

@ -49,6 +49,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5642. TestMiniMRChildTask fails on Windows. MAPREDUCE-5642. TestMiniMRChildTask fails on Windows.
(Chuan Liu via cnauroth) (Chuan Liu via cnauroth)
MAPREDUCE-5827. TestSpeculativeExecutionWithMRApp fails.
(Zhijie Shen via cnauroth)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -40,22 +40,26 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public class TestSpeculativeExecutionWithMRApp { public class TestSpeculativeExecutionWithMRApp {
private static final int NUM_MAPPERS = 5; private static final int NUM_MAPPERS = 5;
private static final int NUM_REDUCERS = 0; private static final int NUM_REDUCERS = 0;
@Test(timeout = 60000) @Test
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = new SystemClock();
ControlledClock clock = new ControlledClock(actualClock); final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis()); clock.setTime(System.currentTimeMillis());
MRApp app = MRApp app =
@ -88,7 +92,7 @@ public class TestSpeculativeExecutionWithMRApp {
Random generator = new Random(); Random generator = new Random();
Object[] taskValues = tasks.values().toArray(); Object[] taskValues = tasks.values().toArray();
Task taskToBeSpeculated = final Task taskToBeSpeculated =
(Task) taskValues[generator.nextInt(taskValues.length)]; (Task) taskValues[generator.nextInt(taskValues.length)];
// Other than one random task, finish every other task. // Other than one random task, finish every other task.
@ -105,30 +109,28 @@ public class TestSpeculativeExecutionWithMRApp {
} }
} }
int maxTimeWait = 10; GenericTestUtils.waitFor(new Supplier<Boolean>() {
boolean successfullySpeculated = false; @Override
TaskAttempt[] ta = null; public Boolean get() {
while (maxTimeWait > 0 && !successfullySpeculated) { if (taskToBeSpeculated.getAttempts().size() != 2) {
if (taskToBeSpeculated.getAttempts().size() != 2) { clock.setTime(System.currentTimeMillis() + 1000);
Thread.sleep(1000); return false;
clock.setTime(System.currentTimeMillis() + 20000); } else {
} else { return true;
successfullySpeculated = true; }
// finish 1st TA, 2nd will be killed
ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
} }
maxTimeWait--; }, 1000, 60000);
} // finish 1st TA, 2nd will be killed
Assert TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
.assertTrue("Couldn't speculate successfully", successfullySpeculated);
verifySpeculationMessage(app, ta); verifySpeculationMessage(app, ta);
app.waitForState(Service.STATE.STOPPED);
} }
@Test(timeout = 60000) @Test
public void testSepculateSuccessfulWithUpdateEvents() throws Exception { public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = new SystemClock();
ControlledClock clock = new ControlledClock(actualClock); final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis()); clock.setTime(System.currentTimeMillis());
MRApp app = MRApp app =
@ -200,21 +202,21 @@ public class TestSpeculativeExecutionWithMRApp {
} }
} }
int maxTimeWait = 5; final Task speculatedTaskConst = speculatedTask;
boolean successfullySpeculated = false; GenericTestUtils.waitFor(new Supplier<Boolean>() {
TaskAttempt[] ta = null; @Override
while (maxTimeWait > 0 && !successfullySpeculated) { public Boolean get() {
if (speculatedTask.getAttempts().size() != 2) { if (speculatedTaskConst.getAttempts().size() != 2) {
Thread.sleep(1000); clock.setTime(System.currentTimeMillis() + 1000);
} else { return false;
successfullySpeculated = true; } else {
ta = makeFirstAttemptWin(appEventHandler, speculatedTask); return true;
}
} }
maxTimeWait--; }, 1000, 60000);
} TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated);
verifySpeculationMessage(app, ta); verifySpeculationMessage(app, ta);
app.waitForState(Service.STATE.STOPPED);
} }
private static TaskAttempt[] makeFirstAttemptWin( private static TaskAttempt[] makeFirstAttemptWin(
@ -234,15 +236,7 @@ public class TestSpeculativeExecutionWithMRApp {
private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta) private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
throws Exception { throws Exception {
app.waitForState(ta[0], TaskAttemptState.SUCCEEDED); app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
app.waitForState(ta[1], TaskAttemptState.KILLED); // The speculative attempt may be not killed before the MR job succeeds.
boolean foundSpecMsg = false;
for (String msg : ta[1].getDiagnostics()) {
if (msg.contains("Speculation")) {
foundSpecMsg = true;
break;
}
}
Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
} }
private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id, private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,