diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index b03d58d01a3..eb6b93292b8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -25,7 +25,10 @@ import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.concurrent.TimeoutException; +import com.google.common.base.Supplier; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -205,10 +208,10 @@ public class TestMRApp { conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f); // uberization forces full slowstart (1.0), so disable that conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); - Job job = app.submit(conf); - app.waitForState(job, JobState.RUNNING); - Assert.assertEquals("Num tasks not correct", 4, job.getTasks().size()); - Iterator it = job.getTasks().values().iterator(); + final Job job1 = app.submit(conf); + app.waitForState(job1, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size()); + Iterator it = job1.getTasks().values().iterator(); Task mapTask1 = it.next(); Task mapTask2 = it.next(); @@ -240,8 +243,20 @@ public class TestMRApp { app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask2, TaskState.SUCCEEDED); - TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, - 100); + final int checkIntervalMillis = 100; + final int waitForMillis = 800; + + waitFor(new Supplier() { + @Override + public Boolean get() { + TaskAttemptCompletionEvent[] events = job1 + .getTaskAttemptCompletionEvents(0, 100); + return events.length == 2; + } + }, checkIntervalMillis, waitForMillis); + + TaskAttemptCompletionEvent[] events = job1.getTaskAttemptCompletionEvents + (0, 100); Assert.assertEquals("Expecting 2 completion events for success", 2, events.length); @@ -253,12 +268,21 @@ public class TestMRApp { nr.setNodeState(NodeState.UNHEALTHY); updatedNodes.add(nr); app.getContext().getEventHandler() - .handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes)); + .handle(new JobUpdatedNodesEvent(job1.getID(), updatedNodes)); app.waitForState(task1Attempt, TaskAttemptState.KILLED); app.waitForState(task2Attempt, TaskAttemptState.KILLED); - events = job.getTaskAttemptCompletionEvents(0, 100); + waitFor(new Supplier() { + @Override + public Boolean get() { + TaskAttemptCompletionEvent[] events = job1 + .getTaskAttemptCompletionEvents(0, 100); + return events.length == 4; + } + }, checkIntervalMillis, waitForMillis); + + events = job1.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals("Expecting 2 more completion events for killed", 4, events.length); @@ -281,7 +305,16 @@ public class TestMRApp { app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask2, TaskState.RUNNING); - events = job.getTaskAttemptCompletionEvents(0, 100); + waitFor(new Supplier() { + @Override + public Boolean get() { + TaskAttemptCompletionEvent[] events = job1 + .getTaskAttemptCompletionEvents(0, 100); + return events.length == 5; + } + }, checkIntervalMillis, waitForMillis); + + events = job1.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals("Expecting 1 more completion events for success", 5, events.length); @@ -295,10 +328,11 @@ public class TestMRApp { conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); - job = app.submit(conf); - app.waitForState(job, JobState.RUNNING); - Assert.assertEquals("No of tasks not correct", 4, job.getTasks().size()); - it = job.getTasks().values().iterator(); + + final Job job2 = app.submit(conf); + app.waitForState(job2, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 4, job2.getTasks().size()); + it = job2.getTasks().values().iterator(); mapTask1 = it.next(); mapTask2 = it.next(); Task reduceTask1 = it.next(); @@ -308,7 +342,16 @@ public class TestMRApp { app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask2, TaskState.RUNNING); - events = job.getTaskAttemptCompletionEvents(0, 100); + waitFor(new Supplier() { + @Override + public Boolean get() { + TaskAttemptCompletionEvent[] events = job2 + .getTaskAttemptCompletionEvents(0, 100); + return events.length == 2; + } + }, checkIntervalMillis, waitForMillis); + + events = job2.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals( "Expecting 2 completion events for killed & success of map1", 2, events.length); @@ -321,7 +364,16 @@ public class TestMRApp { TaskAttemptEventType.TA_DONE)); app.waitForState(mapTask2, TaskState.SUCCEEDED); - events = job.getTaskAttemptCompletionEvents(0, 100); + waitFor(new Supplier() { + @Override + public Boolean get() { + TaskAttemptCompletionEvent[] events = job2 + .getTaskAttemptCompletionEvents(0, 100); + return events.length == 3; + } + }, checkIntervalMillis, waitForMillis); + + events = job2.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals("Expecting 1 more completion events for success", 3, events.length); @@ -350,14 +402,30 @@ public class TestMRApp { .handle( new TaskAttemptEvent(task4Attempt.getID(), TaskAttemptEventType.TA_DONE)); - app.waitForState(reduceTask2, TaskState.SUCCEEDED); + app.waitForState(reduceTask2, TaskState.SUCCEEDED); - events = job.getTaskAttemptCompletionEvents(0, 100); + waitFor(new Supplier() { + @Override + public Boolean get() { + TaskAttemptCompletionEvent[] events = job2 + .getTaskAttemptCompletionEvents(0, 100); + return events.length == 5; + } + }, checkIntervalMillis, waitForMillis); + events = job2.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals("Expecting 2 more completion events for reduce success", 5, events.length); // job succeeds - app.waitForState(job, JobState.SUCCEEDED); + app.waitForState(job2, JobState.SUCCEEDED); + } + + private static void waitFor(Supplier predicate, int + checkIntervalMillis, int checkTotalMillis) throws InterruptedException { + try { + GenericTestUtils.waitFor(predicate, checkIntervalMillis, checkTotalMillis); + } catch (TimeoutException ex) { + } } @Test