MAPREDUCE-4785. TestMRApp occasionally fails (haibochen via rkanter)

(cherry picked from commit ff0ee84d77)
(cherry picked from commit f9f57265cf)

Conflicts:
	hadoop-mapreduce-project/CHANGES.txt
This commit is contained in:
Robert Kanter 2016-03-03 16:38:07 -08:00 committed by Wangda Tan
parent a996889313
commit c9ba6fa9ba
2 changed files with 88 additions and 18 deletions

View File

@ -20,6 +20,8 @@ Release 2.7.3 - UNRELEASED
BUG FIXES BUG FIXES
MAPREDUCE-4785. TestMRApp occasionally fails (haibochen via rkanter)
MAPREDUCE-6540. TestMRTimelineEventHandling fails (sjlee) MAPREDUCE-6540. TestMRTimelineEventHandling fails (sjlee)
MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API. MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API.

View File

@ -25,7 +25,10 @@ import static org.mockito.Mockito.verify;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -205,10 +208,10 @@ public class TestMRApp {
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f); conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
// uberization forces full slowstart (1.0), so disable that // uberization forces full slowstart (1.0), so disable that
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf); final Job job1 = app.submit(conf);
app.waitForState(job, JobState.RUNNING); app.waitForState(job1, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 4, job.getTasks().size()); Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator(); Iterator<Task> it = job1.getTasks().values().iterator();
Task mapTask1 = it.next(); Task mapTask1 = it.next();
Task mapTask2 = it.next(); Task mapTask2 = it.next();
@ -240,8 +243,20 @@ public class TestMRApp {
app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED); app.waitForState(mapTask2, TaskState.SUCCEEDED);
TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, final int checkIntervalMillis = 100;
100); final int waitForMillis = 800;
waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
TaskAttemptCompletionEvent[] events = job1
.getTaskAttemptCompletionEvents(0, 100);
return events.length == 2;
}
}, checkIntervalMillis, waitForMillis);
TaskAttemptCompletionEvent[] events = job1.getTaskAttemptCompletionEvents
(0, 100);
Assert.assertEquals("Expecting 2 completion events for success", 2, Assert.assertEquals("Expecting 2 completion events for success", 2,
events.length); events.length);
@ -253,12 +268,21 @@ public class TestMRApp {
nr.setNodeState(NodeState.UNHEALTHY); nr.setNodeState(NodeState.UNHEALTHY);
updatedNodes.add(nr); updatedNodes.add(nr);
app.getContext().getEventHandler() app.getContext().getEventHandler()
.handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes)); .handle(new JobUpdatedNodesEvent(job1.getID(), updatedNodes));
app.waitForState(task1Attempt, TaskAttemptState.KILLED); app.waitForState(task1Attempt, TaskAttemptState.KILLED);
app.waitForState(task2Attempt, TaskAttemptState.KILLED); app.waitForState(task2Attempt, TaskAttemptState.KILLED);
events = job.getTaskAttemptCompletionEvents(0, 100); waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
TaskAttemptCompletionEvent[] events = job1
.getTaskAttemptCompletionEvents(0, 100);
return events.length == 4;
}
}, checkIntervalMillis, waitForMillis);
events = job1.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Expecting 2 more completion events for killed", 4, Assert.assertEquals("Expecting 2 more completion events for killed", 4,
events.length); events.length);
@ -281,7 +305,16 @@ public class TestMRApp {
app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.RUNNING); app.waitForState(mapTask2, TaskState.RUNNING);
events = job.getTaskAttemptCompletionEvents(0, 100); waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
TaskAttemptCompletionEvent[] events = job1
.getTaskAttemptCompletionEvents(0, 100);
return events.length == 5;
}
}, checkIntervalMillis, waitForMillis);
events = job1.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Expecting 1 more completion events for success", 5, Assert.assertEquals("Expecting 1 more completion events for success", 5,
events.length); events.length);
@ -295,10 +328,11 @@ public class TestMRApp {
conf = new Configuration(); conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING); final Job job2 = app.submit(conf);
Assert.assertEquals("No of tasks not correct", 4, job.getTasks().size()); app.waitForState(job2, JobState.RUNNING);
it = job.getTasks().values().iterator(); Assert.assertEquals("No of tasks not correct", 4, job2.getTasks().size());
it = job2.getTasks().values().iterator();
mapTask1 = it.next(); mapTask1 = it.next();
mapTask2 = it.next(); mapTask2 = it.next();
Task reduceTask1 = it.next(); Task reduceTask1 = it.next();
@ -308,7 +342,16 @@ public class TestMRApp {
app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.RUNNING); app.waitForState(mapTask2, TaskState.RUNNING);
events = job.getTaskAttemptCompletionEvents(0, 100); waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
TaskAttemptCompletionEvent[] events = job2
.getTaskAttemptCompletionEvents(0, 100);
return events.length == 2;
}
}, checkIntervalMillis, waitForMillis);
events = job2.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals( Assert.assertEquals(
"Expecting 2 completion events for killed & success of map1", 2, "Expecting 2 completion events for killed & success of map1", 2,
events.length); events.length);
@ -321,7 +364,16 @@ public class TestMRApp {
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
app.waitForState(mapTask2, TaskState.SUCCEEDED); app.waitForState(mapTask2, TaskState.SUCCEEDED);
events = job.getTaskAttemptCompletionEvents(0, 100); waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
TaskAttemptCompletionEvent[] events = job2
.getTaskAttemptCompletionEvents(0, 100);
return events.length == 3;
}
}, checkIntervalMillis, waitForMillis);
events = job2.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Expecting 1 more completion events for success", 3, Assert.assertEquals("Expecting 1 more completion events for success", 3,
events.length); events.length);
@ -350,14 +402,30 @@ public class TestMRApp {
.handle( .handle(
new TaskAttemptEvent(task4Attempt.getID(), new TaskAttemptEvent(task4Attempt.getID(),
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
app.waitForState(reduceTask2, TaskState.SUCCEEDED); app.waitForState(reduceTask2, TaskState.SUCCEEDED);
events = job.getTaskAttemptCompletionEvents(0, 100); waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
TaskAttemptCompletionEvent[] events = job2
.getTaskAttemptCompletionEvents(0, 100);
return events.length == 5;
}
}, checkIntervalMillis, waitForMillis);
events = job2.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Expecting 2 more completion events for reduce success", Assert.assertEquals("Expecting 2 more completion events for reduce success",
5, events.length); 5, events.length);
// job succeeds // job succeeds
app.waitForState(job, JobState.SUCCEEDED); app.waitForState(job2, JobState.SUCCEEDED);
}
private static void waitFor(Supplier<Boolean> predicate, int
checkIntervalMillis, int checkTotalMillis) throws InterruptedException {
try {
GenericTestUtils.waitFor(predicate, checkIntervalMillis, checkTotalMillis);
} catch (TimeoutException ex) {
}
} }
@Test @Test