diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 64aff9a574d..e024c96ed8c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -79,6 +79,9 @@ Release 2.4.1 - UNRELEASED MAPREDUCE-5827. TestSpeculativeExecutionWithMRApp fails. (Zhijie Shen via cnauroth) + MAPREDUCE-5833. TestRMContainerAllocator fails ocassionally. + (Zhijie Shen via cnauroth) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index fc56700b247..4418aa259be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -151,7 +151,8 @@ public class RMContainerAllocator extends RMContainerRequestor private long retryInterval; private long retrystartTime; - BlockingQueue eventQueue + @VisibleForTesting + protected BlockingQueue eventQueue = new LinkedBlockingQueue(); private ScheduleStats scheduleStats = new ScheduleStats(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 9a0de8d53bf..1785864ae6d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; @@ -62,6 +63,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; @@ -73,6 +75,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -104,6 +107,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Supplier; + @SuppressWarnings("unchecked") public class TestRMContainerAllocator { @@ -579,7 +584,7 @@ public class TestRMContainerAllocator { MyContainerAllocator allocator = (MyContainerAllocator) mrApp .getContainerAllocator(); - mrApp.waitForState(job, JobState.RUNNING); + mrApp.waitForInternalState((JobImpl) job, JobStateInternal.RUNNING); amDispatcher.await(); // Wait till all map-attempts request for containers @@ -731,7 +736,7 @@ public class TestRMContainerAllocator { MyContainerAllocator allocator = (MyContainerAllocator) mrApp .getContainerAllocator(); - mrApp.waitForState(job, JobState.RUNNING); + mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING); amDispatcher.await(); // Wait till all map-attempts request for containers @@ -1550,7 +1555,15 @@ public class TestRMContainerAllocator { } // API to be used by tests - public List schedule() { + public List schedule() + throws Exception { + // before doing heartbeat with RM, drain all the outstanding events to + // ensure all the requests before this heartbeat is to be handled + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return eventQueue.isEmpty(); + } + }, 100, 10000); // run the scheduler try { super.heartbeat(); @@ -1586,7 +1599,6 @@ public class TestRMContainerAllocator { public boolean isUnregistered() { return isUnregistered; } - } @Test