MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1224995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-27 19:54:51 +00:00
parent cd90b82227
commit fcbad14a3d
14 changed files with 318 additions and 144 deletions

View File

@ -179,6 +179,8 @@ Release 0.23.1 - Unreleased
immediately after downloading a resource instead of always waiting for a immediately after downloading a resource instead of always waiting for a
second. (Siddarth Seth via vinodkv) second. (Siddarth Seth via vinodkv)
MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)
BUG FIXES BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

View File

@ -228,7 +228,7 @@ public class MRAppMaster extends CompositeService {
+ recoveryEnabled + " recoverySupportedByCommitter: " + recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " ApplicationAttemptID: " + recoverySupportedByCommitter + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId()); + appAttemptID.getAttemptId());
dispatcher = new AsyncDispatcher(); dispatcher = createDispatcher();
addIfService(dispatcher); addIfService(dispatcher);
} }
@ -291,6 +291,10 @@ public class MRAppMaster extends CompositeService {
super.init(conf); super.init(conf);
} // end of init() } // end of init()
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
private OutputCommitter createOutputCommitter(Configuration conf) { private OutputCommitter createOutputCommitter(Configuration conf) {
OutputCommitter committer = null; OutputCommitter committer = null;

View File

@ -53,6 +53,7 @@ public interface Job {
int getTotalReduces(); int getTotalReduces();
int getCompletedMaps(); int getCompletedMaps();
int getCompletedReduces(); int getCompletedReduces();
float getProgress();
boolean isUber(); boolean isUber();
String getUserName(); String getUserName();
String getQueueName(); String getQueueName();

View File

@ -128,6 +128,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final String username; private final String username;
private final OutputCommitter committer; private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs; private final Map<JobACL, AccessControlList> jobACLs;
private float setupWeight = 0.05f;
private float cleanupWeight = 0.05f;
private float mapWeight = 0.0f;
private float reduceWeight = 0.0f;
private final Set<TaskId> completedTasksFromPreviousRun; private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos; private final List<AMInfo> amInfos;
private final Lock readLock; private final Lock readLock;
@ -147,7 +151,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final long appSubmitTime; private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false; private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>(); volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = newCounters(); private Counters jobCounters = newCounters();
// FIXME: // FIXME:
// //
@ -353,6 +357,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private long startTime; private long startTime;
private long finishTime; private long finishTime;
private float setupProgress; private float setupProgress;
private float mapProgress;
private float reduceProgress;
private float cleanupProgress; private float cleanupProgress;
private boolean isUber = false; private boolean isUber = false;
@ -587,30 +593,51 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} }
computeProgress();
return MRBuilderUtils.newJobReport(jobId, jobName, username, state, return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, appSubmitTime, startTime, finishTime, setupProgress,
computeProgress(mapTasks), computeProgress(reduceTasks), this.mapProgress, this.reduceProgress,
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
private float computeProgress(Set<TaskId> taskIds) { @Override
readLock.lock(); public float getProgress() {
this.readLock.lock();
try { try {
float progress = 0; computeProgress();
for (TaskId taskId : taskIds) { return (this.setupProgress * this.setupWeight + this.cleanupProgress
Task task = tasks.get(taskId); * this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress
progress += task.getProgress(); * this.reduceWeight);
}
int taskIdsSize = taskIds.size();
if (taskIdsSize != 0) {
progress = progress/taskIdsSize;
}
return progress;
} finally { } finally {
readLock.unlock(); this.readLock.unlock();
}
}
private void computeProgress() {
this.readLock.lock();
try {
float mapProgress = 0f;
float reduceProgress = 0f;
for (Task task : this.tasks.values()) {
if (task.getType() == TaskType.MAP) {
mapProgress += task.getProgress();
} else {
reduceProgress += task.getProgress();
}
}
if (this.numMapTasks != 0) {
mapProgress = mapProgress / this.numMapTasks;
}
if (this.numReduceTasks != 0) {
reduceProgress = reduceProgress / this.numReduceTasks;
}
this.mapProgress = mapProgress;
this.reduceProgress = reduceProgress;
} finally {
this.readLock.unlock();
} }
} }
@ -731,7 +758,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
static JobState checkJobCompleteSuccess(JobImpl job) { static JobState checkJobCompleteSuccess(JobImpl job) {
// check for Job success // check for Job success
if (job.completedTaskCount == job.getTasks().size()) { if (job.completedTaskCount == job.tasks.size()) {
try { try {
// Commit job & do cleanup // Commit job & do cleanup
job.getCommitter().commitJob(job.getJobContext()); job.getCommitter().commitJob(job.getJobContext());
@ -970,6 +997,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (job.numMapTasks == 0 && job.numReduceTasks == 0) { if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
} }
checkTaskLimits(); checkTaskLimits();

View File

@ -376,7 +376,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
try { try {
TaskAttempt bestAttempt = selectBestAttempt(); TaskAttempt bestAttempt = selectBestAttempt();
if (bestAttempt == null) { if (bestAttempt == null) {
return 0; return 0f;
} }
return bestAttempt.getProgress(); return bestAttempt.getProgress();
} finally { } finally {
@ -457,9 +457,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
result = at; //The first time around result = at; //The first time around
} }
// calculate the best progress // calculate the best progress
if (at.getProgress() > progress) { float attemptProgress = at.getProgress();
if (attemptProgress > progress) {
result = at; result = at;
progress = at.getProgress(); progress = attemptProgress;
} }
} }
return result; return result;

View File

@ -128,25 +128,7 @@ public abstract class RMCommunicator extends AbstractService {
protected float getApplicationProgress() { protected float getApplicationProgress() {
// For now just a single job. In future when we have a DAG, we need an // For now just a single job. In future when we have a DAG, we need an
// aggregate progress. // aggregate progress.
JobReport report = this.job.getReport(); return this.job.getProgress();
float setupWeight = 0.05f;
float cleanupWeight = 0.05f;
float mapWeight = 0.0f;
float reduceWeight = 0.0f;
int numMaps = this.job.getTotalMaps();
int numReduces = this.job.getTotalReduces();
if (numMaps == 0 && numReduces == 0) {
} else if (numMaps == 0) {
reduceWeight = 0.9f;
} else if (numReduces == 0) {
mapWeight = 0.9f;
} else {
mapWeight = reduceWeight = 0.45f;
}
return (report.getSetupProgress() * setupWeight
+ report.getCleanupProgress() * cleanupWeight
+ report.getMapProgress() * mapWeight + report.getReduceProgress()
* reduceWeight);
} }
protected void register() { protected void register() {

View File

@ -161,7 +161,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
" finishedContainers=" + " finishedContainers=" +
response.getCompletedContainersStatuses().size() + response.getCompletedContainersStatuses().size() +
" resourcelimit=" + availableResources + " resourcelimit=" + availableResources +
"knownNMs=" + clusterNmCount); " knownNMs=" + clusterNmCount);
ask.clear(); ask.clear();
release.clear(); release.clear();

View File

@ -115,7 +115,8 @@ public class MRApp extends MRAppMaster {
applicationId.setId(0); applicationId.setId(0);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1); this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
} }
@ -141,10 +142,17 @@ public class MRApp extends MRAppMaster {
return containerId; return containerId;
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
super(getApplicationAttemptId(applicationId, startCount), getContainerId( this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System
.currentTimeMillis()); .currentTimeMillis());
this.testWorkDir = new File("target", testName); this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath()); testAbsPath = new Path(testWorkDir.getAbsolutePath());
@ -205,9 +213,9 @@ public class MRApp extends MRAppMaster {
TaskReport report = task.getReport(); TaskReport report = task.getReport();
while (!finalState.equals(report.getTaskState()) && while (!finalState.equals(report.getTaskState()) &&
timeoutSecs++ < 20) { timeoutSecs++ < 20) {
System.out.println("Task State is : " + report.getTaskState() + System.out.println("Task State for " + task.getID() + " is : "
" Waiting for state : " + finalState + + report.getTaskState() + " Waiting for state : " + finalState
" progress : " + report.getProgress()); + " progress : " + report.getProgress());
report = task.getReport(); report = task.getReport();
Thread.sleep(500); Thread.sleep(500);
} }

View File

@ -425,6 +425,11 @@ public class MockJobs extends MockApps {
return report; return report;
} }
@Override
public float getProgress() {
return 0;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return counters; return counters;

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -36,15 +37,20 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
@ -78,6 +84,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("unchecked")
public class TestRMContainerAllocator { public class TestRMContainerAllocator {
static final Log LOG = LogFactory static final Log LOG = LogFactory
@ -338,98 +345,155 @@ public class TestRMContainerAllocator {
} }
} }
private static class FakeJob extends JobImpl {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) {
super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
appAttemptID, conf, null, null, null, null, null, null, null, null,
true, null, System.currentTimeMillis(), null);
this.jobId = getID();
this.numMaps = numMaps;
this.numReduces = numReduces;
}
private float setupProgress;
private float mapProgress;
private float reduceProgress;
private float cleanupProgress;
private final int numMaps;
private final int numReduces;
private JobId jobId;
void setProgress(float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress) {
this.setupProgress = setupProgress;
this.mapProgress = mapProgress;
this.reduceProgress = reduceProgress;
this.cleanupProgress = cleanupProgress;
}
@Override
public int getTotalMaps() { return this.numMaps; }
@Override
public int getTotalReduces() { return this.numReduces;}
@Override
public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
this.reduceProgress, this.cleanupProgress, "jobfile", null, false);
}
}
@Test @Test
public void testReportedAppProgress() throws Exception { public void testReportedAppProgress() throws Exception {
LOG.info("Running testReportedAppProgress"); LOG.info("Running testReportedAppProgress");
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf); final MyResourceManager rm = new MyResourceManager(conf);
rm.start(); rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher(); .getDispatcher();
// Submit the application // Submit the application
RMApp app = rm.submitApp(1024); RMApp rmApp = rm.submitApp(1024);
dispatcher.await(); rmDispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); MockNM amNodeManager = rm.registerNode("amNM:1234", 21504);
amNodeManager.nodeHeartbeat(true); amNodeManager.nodeHeartbeat(true);
dispatcher.await(); rmDispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId(); .getAppAttemptId();
rm.sendAMLaunched(appAttemptId); rm.sendAMLaunched(appAttemptId);
dispatcher.await(); rmDispatcher.await();
FakeJob job = new FakeJob(appAttemptId, conf, 2, 2); MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
appAttemptId, job); @Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new MyContainerAllocator(rm, appAttemptId, context);
};
};
Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
mrApp.submit(conf);
Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
.getValue();
DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
.getContainerAllocator();
mrApp.waitForState(job, JobState.RUNNING);
amDispatcher.await();
// Wait till all map-attempts request for containers
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.MAP) {
mrApp.waitForState(t.getAttempts().values().iterator().next(),
TaskAttemptState.UNASSIGNED);
}
}
amDispatcher.await();
allocator.schedule();
rmDispatcher.await();
amNodeManager.nodeHeartbeat(true);
rmDispatcher.await();
allocator.schedule();
rmDispatcher.await();
// Wait for all map-tasks to be running
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.MAP) {
mrApp.waitForState(t, TaskState.RUNNING);
}
}
allocator.schedule(); // Send heartbeat allocator.schedule(); // Send heartbeat
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(0.0, app.getProgress(), 0.0); Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 10, 0, 0); // Finish off 1 map.
Iterator<Task> it = job.getTasks().values().iterator();
finishNextNTasks(mrApp, it, 1);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(9.5f, app.getProgress(), 0.0); Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 80, 0, 0); // Finish off 7 more so that map-progress is 80%
finishNextNTasks(mrApp, it, 7);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(41.0f, app.getProgress(), 0.0); Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 100, 20, 0); // Finish off the 2 remaining maps
allocator.schedule(); finishNextNTasks(mrApp, it, 2);
dispatcher.await();
Assert.assertEquals(59.0f, app.getProgress(), 0.0); // Wait till all reduce-attempts request for containers
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.REDUCE) {
mrApp.waitForState(t.getAttempts().values().iterator().next(),
TaskAttemptState.UNASSIGNED);
}
}
job.setProgress(100, 100, 100, 100);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(100.0f, app.getProgress(), 0.0); amNodeManager.nodeHeartbeat(true);
rmDispatcher.await();
allocator.schedule();
rmDispatcher.await();
// Wait for all reduce-tasks to be running
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.REDUCE) {
mrApp.waitForState(t, TaskState.RUNNING);
}
}
// Finish off 2 reduces
finishNextNTasks(mrApp, it, 2);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off the remaining 8 reduces.
finishNextNTasks(mrApp, it, 8);
allocator.schedule();
rmDispatcher.await();
// Remaining is JobCleanup
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
}
private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int nextN)
throws Exception {
Task task;
for (int i=0; i<nextN; i++) {
task = it.next();
finishTask(mrApp, task);
}
}
private void finishTask(MRApp mrApp, Task task) throws Exception {
TaskAttempt attempt = task.getAttempts().values().iterator().next();
mrApp.getContext().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
mrApp.waitForState(task, TaskState.SUCCEEDED);
} }
@Test @Test
@ -438,46 +502,96 @@ public class TestRMContainerAllocator {
LOG.info("Running testReportedAppProgressWithOnlyMaps"); LOG.info("Running testReportedAppProgressWithOnlyMaps");
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf); final MyResourceManager rm = new MyResourceManager(conf);
rm.start(); rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher(); .getDispatcher();
// Submit the application // Submit the application
RMApp app = rm.submitApp(1024); RMApp rmApp = rm.submitApp(1024);
dispatcher.await(); rmDispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
amNodeManager.nodeHeartbeat(true); amNodeManager.nodeHeartbeat(true);
dispatcher.await(); rmDispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId(); .getAppAttemptId();
rm.sendAMLaunched(appAttemptId); rm.sendAMLaunched(appAttemptId);
dispatcher.await(); rmDispatcher.await();
FakeJob job = new FakeJob(appAttemptId, conf, 2, 0); MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
appAttemptId, job); @Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new MyContainerAllocator(rm, appAttemptId, context);
};
};
Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
mrApp.submit(conf);
Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
.getValue();
DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
.getContainerAllocator();
mrApp.waitForState(job, JobState.RUNNING);
amDispatcher.await();
// Wait till all map-attempts request for containers
for (Task t : job.getTasks().values()) {
mrApp.waitForState(t.getAttempts().values().iterator().next(),
TaskAttemptState.UNASSIGNED);
}
amDispatcher.await();
allocator.schedule();
rmDispatcher.await();
amNodeManager.nodeHeartbeat(true);
rmDispatcher.await();
allocator.schedule();
rmDispatcher.await();
// Wait for all map-tasks to be running
for (Task t : job.getTasks().values()) {
mrApp.waitForState(t, TaskState.RUNNING);
}
allocator.schedule(); // Send heartbeat allocator.schedule(); // Send heartbeat
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(0.0, app.getProgress(), 0.0); Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 10, 0, 0); Iterator<Task> it = job.getTasks().values().iterator();
allocator.schedule();
dispatcher.await();
Assert.assertEquals(14f, app.getProgress(), 0.0);
job.setProgress(100, 60, 0, 0); // Finish off 1 map so that map-progress is 10%
finishNextNTasks(mrApp, it, 1);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(59.0f, app.getProgress(), 0.0); Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 100, 0, 100); // Finish off 5 more map so that map-progress is 60%
finishNextNTasks(mrApp, it, 5);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(100.0f, app.getProgress(), 0.0); Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off remaining map so that map-progress is 100%
finishNextNTasks(mrApp, it, 4);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
} }
@Test @Test
@ -1000,7 +1114,6 @@ public class TestRMContainerAllocator {
private MyResourceManager rm; private MyResourceManager rm;
@SuppressWarnings("rawtypes")
private static AppContext createAppContext( private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job) { ApplicationAttemptId appAttemptId, Job job) {
AppContext context = mock(AppContext.class); AppContext context = mock(AppContext.class);
@ -1028,7 +1141,15 @@ public class TestRMContainerAllocator {
return service; return service;
} }
MyContainerAllocator(MyResourceManager rm, Configuration conf, // Use this constructor when using a real job.
MyContainerAllocator(MyResourceManager rm,
ApplicationAttemptId appAttemptId, AppContext context) {
super(createMockClientService(), context);
this.rm = rm;
}
// Use this constructor when you are using a mocked job.
public MyContainerAllocator(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job) { ApplicationAttemptId appAttemptId, Job job) {
super(createMockClientService(), createAppContext(appAttemptId, job)); super(createMockClientService(), createAppContext(appAttemptId, job));
this.rm = rm; this.rm = rm;
@ -1090,6 +1211,7 @@ public class TestRMContainerAllocator {
return result; return result;
} }
@Override
protected void startAllocatorThread() { protected void startAllocatorThread() {
// override to NOT start thread // override to NOT start thread
} }

View File

@ -393,6 +393,11 @@ public class TestRuntimeEstimators {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public float getProgress() {
return 0;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");

View File

@ -65,7 +65,7 @@ public class TestJobImpl {
Task mockTask = mock(Task.class); Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>(); Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask); tasks.put(mockTask.getID(), mockTask);
when(mockJob.getTasks()).thenReturn(tasks); mockJob.tasks = tasks;
when(mockJob.getState()).thenReturn(JobState.ERROR); when(mockJob.getState()).thenReturn(JobState.ERROR);
JobEvent mockJobEvent = mock(JobEvent.class); JobEvent mockJobEvent = mock(JobEvent.class);
@ -73,11 +73,12 @@ public class TestJobImpl {
Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition", Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
JobState.ERROR, state); JobState.ERROR, state);
} }
@Test @Test
public void testCheckJobCompleteSuccess() { public void testCheckJobCompleteSuccess() {
JobImpl mockJob = mock(JobImpl.class); JobImpl mockJob = mock(JobImpl.class);
mockJob.tasks = new HashMap<TaskId, Task>();
OutputCommitter mockCommitter = mock(OutputCommitter.class); OutputCommitter mockCommitter = mock(OutputCommitter.class);
EventHandler mockEventHandler = mock(EventHandler.class); EventHandler mockEventHandler = mock(EventHandler.class);
JobContext mockJobContext = mock(JobContext.class); JobContext mockJobContext = mock(JobContext.class);
@ -110,7 +111,7 @@ public class TestJobImpl {
Task mockTask = mock(Task.class); Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>(); Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask); tasks.put(mockTask.getID(), mockTask);
when(mockJob.getTasks()).thenReturn(tasks); mockJob.tasks = tasks;
try { try {
// Just in case the code breaks and reaches these calls // Just in case the code breaks and reaches these calls

View File

@ -135,6 +135,11 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
return report; return report;
} }
@Override
public float getProgress() {
return 1.0f;
}
@Override @Override
public JobState getState() { public JobState getState() {
return report.getJobState(); return report.getJobState();

View File

@ -89,6 +89,11 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
return jobReport; return jobReport;
} }
@Override
public float getProgress() {
return 1.0f;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return null; return null;