diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 55bfdefcb9a..dd07d459ccc 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -96,6 +96,8 @@ Release 0.23.1 - Unreleased immediately after downloading a resource instead of always waiting for a second. (Siddarth Seth via vinodkv) + MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv) + BUG FIXES MAPREDUCE-2950. [Rumen] Fixed TestUserResolve. (Ravi Gummadi via amarrk) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 5c2e0fd0c88..8ba241ec02d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -228,7 +228,7 @@ public class MRAppMaster extends CompositeService { + recoveryEnabled + " recoverySupportedByCommitter: " + recoverySupportedByCommitter + " ApplicationAttemptID: " + appAttemptID.getAttemptId()); - dispatcher = new AsyncDispatcher(); + dispatcher = createDispatcher(); addIfService(dispatcher); } @@ -291,6 +291,10 @@ public class MRAppMaster extends CompositeService { super.init(conf); } // end of init() + protected Dispatcher createDispatcher() { + return new AsyncDispatcher(); + } + private OutputCommitter createOutputCommitter(Configuration conf) { OutputCommitter committer = null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index 4abcd341847..9094c77cc33 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -53,6 +53,7 @@ public interface Job { int getTotalReduces(); int getCompletedMaps(); int getCompletedReduces(); + float getProgress(); boolean isUber(); String getUserName(); String getQueueName(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index cb9171cedc2..183f15156c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -128,6 +128,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final String username; private final OutputCommitter committer; private final Map jobACLs; + private float setupWeight = 0.05f; + private float cleanupWeight = 0.05f; + private float mapWeight = 0.0f; + private float reduceWeight = 0.0f; private final Set completedTasksFromPreviousRun; private final List amInfos; private final Lock readLock; @@ -147,7 +151,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final long appSubmitTime; private boolean lazyTasksCopyNeeded = false; - private volatile Map tasks = new LinkedHashMap(); + volatile Map tasks = new LinkedHashMap(); private Counters jobCounters = newCounters(); // FIXME: // @@ -353,6 +357,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private long startTime; private long finishTime; private float setupProgress; + private float mapProgress; + private float reduceProgress; private float cleanupProgress; private boolean isUber = false; @@ -587,30 +593,51 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); } + computeProgress(); return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, - computeProgress(mapTasks), computeProgress(reduceTasks), + this.mapProgress, this.reduceProgress, cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); } finally { readLock.unlock(); } } - private float computeProgress(Set taskIds) { - readLock.lock(); + @Override + public float getProgress() { + this.readLock.lock(); try { - float progress = 0; - for (TaskId taskId : taskIds) { - Task task = tasks.get(taskId); - progress += task.getProgress(); - } - int taskIdsSize = taskIds.size(); - if (taskIdsSize != 0) { - progress = progress/taskIdsSize; - } - return progress; + computeProgress(); + return (this.setupProgress * this.setupWeight + this.cleanupProgress + * this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress + * this.reduceWeight); } finally { - readLock.unlock(); + this.readLock.unlock(); + } + } + + private void computeProgress() { + this.readLock.lock(); + try { + float mapProgress = 0f; + float reduceProgress = 0f; + for (Task task : this.tasks.values()) { + if (task.getType() == TaskType.MAP) { + mapProgress += task.getProgress(); + } else { + reduceProgress += task.getProgress(); + } + } + if (this.numMapTasks != 0) { + mapProgress = mapProgress / this.numMapTasks; + } + if (this.numReduceTasks != 0) { + reduceProgress = reduceProgress / this.numReduceTasks; + } + this.mapProgress = mapProgress; + this.reduceProgress = reduceProgress; + } finally { + this.readLock.unlock(); } } @@ -731,7 +758,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, static JobState checkJobCompleteSuccess(JobImpl job) { // check for Job success - if (job.completedTaskCount == job.getTasks().size()) { + if (job.completedTaskCount == job.tasks.size()) { try { // Commit job & do cleanup job.getCommitter().commitJob(job.getJobContext()); @@ -970,6 +997,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, if (job.numMapTasks == 0 && job.numReduceTasks == 0) { job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); + } else if (job.numMapTasks == 0) { + job.reduceWeight = 0.9f; + } else if (job.numReduceTasks == 0) { + job.mapWeight = 0.9f; + } else { + job.mapWeight = job.reduceWeight = 0.45f; } checkTaskLimits(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index b0708540834..b403751154b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -376,7 +376,7 @@ public abstract class TaskImpl implements Task, EventHandler { try { TaskAttempt bestAttempt = selectBestAttempt(); if (bestAttempt == null) { - return 0; + return 0f; } return bestAttempt.getProgress(); } finally { @@ -457,9 +457,10 @@ public abstract class TaskImpl implements Task, EventHandler { result = at; //The first time around } // calculate the best progress - if (at.getProgress() > progress) { + float attemptProgress = at.getProgress(); + if (attemptProgress > progress) { result = at; - progress = at.getProgress(); + progress = attemptProgress; } } return result; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 4281e0a4842..5276276c4e8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -128,25 +128,7 @@ public abstract class RMCommunicator extends AbstractService { protected float getApplicationProgress() { // For now just a single job. In future when we have a DAG, we need an // aggregate progress. - JobReport report = this.job.getReport(); - float setupWeight = 0.05f; - float cleanupWeight = 0.05f; - float mapWeight = 0.0f; - float reduceWeight = 0.0f; - int numMaps = this.job.getTotalMaps(); - int numReduces = this.job.getTotalReduces(); - if (numMaps == 0 && numReduces == 0) { - } else if (numMaps == 0) { - reduceWeight = 0.9f; - } else if (numReduces == 0) { - mapWeight = 0.9f; - } else { - mapWeight = reduceWeight = 0.45f; - } - return (report.getSetupProgress() * setupWeight - + report.getCleanupProgress() * cleanupWeight - + report.getMapProgress() * mapWeight + report.getReduceProgress() - * reduceWeight); + return this.job.getProgress(); } protected void register() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 2f25075ee84..7aa638afe6b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -161,7 +161,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { " finishedContainers=" + response.getCompletedContainersStatuses().size() + " resourcelimit=" + availableResources + - "knownNMs=" + clusterNmCount); + " knownNMs=" + clusterNmCount); ask.clear(); release.clear(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 561ecac8a91..11a8671707d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -115,7 +115,8 @@ public class MRApp extends MRAppMaster { applicationId.setId(0); } - public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { + public MRApp(int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart) { this(maps, reduces, autoComplete, testName, cleanOnStart, 1); } @@ -141,10 +142,17 @@ public class MRApp extends MRAppMaster { return containerId; } - public MRApp(int maps, int reduces, boolean autoComplete, String testName, + public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { - super(getApplicationAttemptId(applicationId, startCount), getContainerId( - applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System + this(getApplicationAttemptId(applicationId, startCount), getContainerId( + applicationId, startCount), maps, reduces, autoComplete, testName, + cleanOnStart, startCount); + } + + public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, + int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart, int startCount) { + super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System .currentTimeMillis()); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); @@ -205,9 +213,9 @@ public class MRApp extends MRAppMaster { TaskReport report = task.getReport(); while (!finalState.equals(report.getTaskState()) && timeoutSecs++ < 20) { - System.out.println("Task State is : " + report.getTaskState() + - " Waiting for state : " + finalState + - " progress : " + report.getProgress()); + System.out.println("Task State for " + task.getID() + " is : " + + report.getTaskState() + " Waiting for state : " + finalState + + " progress : " + report.getProgress()); report = task.getReport(); Thread.sleep(500); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 5a67576c444..ad3e4a87120 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -425,6 +425,11 @@ public class MockJobs extends MockApps { return report; } + @Override + public float getProgress() { + return 0; + } + @Override public Counters getCounters() { return counters; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 785d8a7d03f..de3909ea428 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -36,15 +37,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; -import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; @@ -78,6 +84,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Test; +@SuppressWarnings("unchecked") public class TestRMContainerAllocator { static final Log LOG = LogFactory @@ -338,98 +345,155 @@ public class TestRMContainerAllocator { } } - private static class FakeJob extends JobImpl { - - public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf, - int numMaps, int numReduces) { - super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0), - appAttemptID, conf, null, null, null, null, null, null, null, null, - true, null, System.currentTimeMillis(), null); - this.jobId = getID(); - this.numMaps = numMaps; - this.numReduces = numReduces; - } - - private float setupProgress; - private float mapProgress; - private float reduceProgress; - private float cleanupProgress; - private final int numMaps; - private final int numReduces; - private JobId jobId; - - void setProgress(float setupProgress, float mapProgress, - float reduceProgress, float cleanupProgress) { - this.setupProgress = setupProgress; - this.mapProgress = mapProgress; - this.reduceProgress = reduceProgress; - this.cleanupProgress = cleanupProgress; - } - - @Override - public int getTotalMaps() { return this.numMaps; } - @Override - public int getTotalReduces() { return this.numReduces;} - - @Override - public JobReport getReport() { - return MRBuilderUtils.newJobReport(this.jobId, "job", "user", - JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress, - this.reduceProgress, this.cleanupProgress, "jobfile", null, false); - } - } - @Test public void testReportedAppProgress() throws Exception { LOG.info("Running testReportedAppProgress"); Configuration conf = new Configuration(); - MyResourceManager rm = new MyResourceManager(conf); + final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() .getDispatcher(); // Submit the application - RMApp app = rm.submitApp(1024); - dispatcher.await(); + RMApp rmApp = rm.submitApp(1024); + rmDispatcher.await(); - MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + MockNM amNodeManager = rm.registerNode("amNM:1234", 21504); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rmDispatcher.await(); - ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rmDispatcher.await(); - FakeJob job = new FakeJob(appAttemptId, conf, 2, 2); - MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, - appAttemptId, job); + MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId( + appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + protected ContainerAllocator createContainerAllocator( + ClientService clientService, AppContext context) { + return new MyContainerAllocator(rm, appAttemptId, context); + }; + }; + + Assert.assertEquals(0.0, rmApp.getProgress(), 0.0); + + mrApp.submit(conf); + Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next() + .getValue(); + + DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); + + MyContainerAllocator allocator = (MyContainerAllocator) mrApp + .getContainerAllocator(); + + mrApp.waitForState(job, JobState.RUNNING); + + amDispatcher.await(); + // Wait till all map-attempts request for containers + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.MAP) { + mrApp.waitForState(t.getAttempts().values().iterator().next(), + TaskAttemptState.UNASSIGNED); + } + } + amDispatcher.await(); + + allocator.schedule(); + rmDispatcher.await(); + amNodeManager.nodeHeartbeat(true); + rmDispatcher.await(); + allocator.schedule(); + rmDispatcher.await(); + + // Wait for all map-tasks to be running + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.MAP) { + mrApp.waitForState(t, TaskState.RUNNING); + } + } allocator.schedule(); // Send heartbeat - dispatcher.await(); - Assert.assertEquals(0.0, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.05f, job.getProgress(), 0.001f); + Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 10, 0, 0); + // Finish off 1 map. + Iterator it = job.getTasks().values().iterator(); + finishNextNTasks(mrApp, it, 1); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(9.5f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.095f, job.getProgress(), 0.001f); + Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 80, 0, 0); + // Finish off 7 more so that map-progress is 80% + finishNextNTasks(mrApp, it, 7); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(41.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.41f, job.getProgress(), 0.001f); + Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 100, 20, 0); - allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(59.0f, app.getProgress(), 0.0); + // Finish off the 2 remaining maps + finishNextNTasks(mrApp, it, 2); + + // Wait till all reduce-attempts request for containers + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.REDUCE) { + mrApp.waitForState(t.getAttempts().values().iterator().next(), + TaskAttemptState.UNASSIGNED); + } + } - job.setProgress(100, 100, 100, 100); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(100.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + amNodeManager.nodeHeartbeat(true); + rmDispatcher.await(); + allocator.schedule(); + rmDispatcher.await(); + + // Wait for all reduce-tasks to be running + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.REDUCE) { + mrApp.waitForState(t, TaskState.RUNNING); + } + } + + // Finish off 2 reduces + finishNextNTasks(mrApp, it, 2); + + allocator.schedule(); + rmDispatcher.await(); + Assert.assertEquals(0.59f, job.getProgress(), 0.001f); + Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); + + // Finish off the remaining 8 reduces. + finishNextNTasks(mrApp, it, 8); + allocator.schedule(); + rmDispatcher.await(); + // Remaining is JobCleanup + Assert.assertEquals(0.95f, job.getProgress(), 0.001f); + Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); + } + + private void finishNextNTasks(MRApp mrApp, Iterator it, int nextN) + throws Exception { + Task task; + for (int i=0; i it = job.getTasks().values().iterator(); - job.setProgress(100, 60, 0, 0); + // Finish off 1 map so that map-progress is 10% + finishNextNTasks(mrApp, it, 1); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(59.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.14f, job.getProgress(), 0.001f); + Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 100, 0, 100); + // Finish off 5 more map so that map-progress is 60% + finishNextNTasks(mrApp, it, 5); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(100.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.59f, job.getProgress(), 0.001f); + Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); + + // Finish off remaining map so that map-progress is 100% + finishNextNTasks(mrApp, it, 4); + allocator.schedule(); + rmDispatcher.await(); + Assert.assertEquals(0.95f, job.getProgress(), 0.001f); + Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } @Test @@ -1000,7 +1114,6 @@ public class TestRMContainerAllocator { private MyResourceManager rm; - @SuppressWarnings("rawtypes") private static AppContext createAppContext( ApplicationAttemptId appAttemptId, Job job) { AppContext context = mock(AppContext.class); @@ -1028,7 +1141,15 @@ public class TestRMContainerAllocator { return service; } - MyContainerAllocator(MyResourceManager rm, Configuration conf, + // Use this constructor when using a real job. + MyContainerAllocator(MyResourceManager rm, + ApplicationAttemptId appAttemptId, AppContext context) { + super(createMockClientService(), context); + this.rm = rm; + } + + // Use this constructor when you are using a mocked job. + public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job) { super(createMockClientService(), createAppContext(appAttemptId, job)); this.rm = rm; @@ -1090,6 +1211,7 @@ public class TestRMContainerAllocator { return result; } + @Override protected void startAllocatorThread() { // override to NOT start thread } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index f82c1d58330..dd0a7f1c7b6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -393,6 +393,11 @@ public class TestRuntimeEstimators { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public float getProgress() { + return 0; + } + @Override public Counters getCounters() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 9bc18f920f2..a142c085fa6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -65,7 +65,7 @@ public class TestJobImpl { Task mockTask = mock(Task.class); Map tasks = new HashMap(); tasks.put(mockTask.getID(), mockTask); - when(mockJob.getTasks()).thenReturn(tasks); + mockJob.tasks = tasks; when(mockJob.getState()).thenReturn(JobState.ERROR); JobEvent mockJobEvent = mock(JobEvent.class); @@ -73,11 +73,12 @@ public class TestJobImpl { Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition", JobState.ERROR, state); } - + @Test public void testCheckJobCompleteSuccess() { JobImpl mockJob = mock(JobImpl.class); + mockJob.tasks = new HashMap(); OutputCommitter mockCommitter = mock(OutputCommitter.class); EventHandler mockEventHandler = mock(EventHandler.class); JobContext mockJobContext = mock(JobContext.class); @@ -110,7 +111,7 @@ public class TestJobImpl { Task mockTask = mock(Task.class); Map tasks = new HashMap(); tasks.put(mockTask.getID(), mockTask); - when(mockJob.getTasks()).thenReturn(tasks); + mockJob.tasks = tasks; try { // Just in case the code breaks and reaches these calls diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index c1b308935a7..fd828d76693 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -135,6 +135,11 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job return report; } + @Override + public float getProgress() { + return 1.0f; + } + @Override public JobState getState() { return report.getJobState(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index d412a63864d..1b1b3b1cedf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -89,6 +89,11 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job { return jobReport; } + @Override + public float getProgress() { + return 1.0f; + } + @Override public Counters getCounters() { return null;