diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 97ec0c7baef..64d2c2bbce5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -531,7 +531,7 @@ public class TestJobImpl { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.setInt(MRJobConfig.NUM_REDUCES, 1); - AsyncDispatcher dispatcher = new AsyncDispatcher(); + DrainDispatcher dispatcher = new DrainDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); @@ -608,6 +608,7 @@ public class TestJobImpl { NodeReport secondMapperNodeReport = nodeReports.get(1); job.handle(new JobUpdatedNodesEvent(job.getID(), Collections.singletonList(firstMapperNodeReport))); + dispatcher.await(); // complete the reducer for (TaskId taskId: job.tasks.keySet()) { if (taskId.getTaskType() == TaskType.REDUCE) {