MAPREDUCE-2800. Set final progress for tasks to ensure all task information is correctly logged to JobHistory. Contributed by Siddharth Seth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1165930 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3a3f277017
commit
03c1015d6e
|
@ -1207,6 +1207,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-2687. Fix NodeManager to use the right version of
|
||||
LocalDirAllocator.getLocalPathToWrite. (mahadev & acmurthy)
|
||||
|
||||
MAPREDUCE-2800. Set final progress for tasks to ensure all task information
|
||||
is correctly logged to JobHistory. (Siddharth Seth via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1380,6 +1380,8 @@ public abstract class TaskAttemptImpl implements
|
|||
// for it
|
||||
taskAttempt.taskAttemptListener.unregister(
|
||||
taskAttempt.attemptId, taskAttempt.jvmID);
|
||||
taskAttempt.reportedStatus.progress = 1.0f;
|
||||
taskAttempt.updateProgressSplits();
|
||||
//send the cleanup event to containerLauncher
|
||||
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
|
||||
taskAttempt.attemptId,
|
||||
|
|
|
@ -1736,6 +1736,7 @@ class MapTask extends Task {
|
|||
indexCacheList.get(0).writeToFile(
|
||||
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
|
||||
}
|
||||
sortPhase.complete();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1776,6 +1777,7 @@ class MapTask extends Task {
|
|||
} finally {
|
||||
finalOut.close();
|
||||
}
|
||||
sortPhase.complete();
|
||||
return;
|
||||
}
|
||||
{
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.Mapper;
|
|||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.TaskReport;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
||||
|
@ -151,7 +152,7 @@ public class TestMRJobs {
|
|||
Assert.assertTrue(succeeded);
|
||||
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
|
||||
verifySleepJobCounters(job);
|
||||
|
||||
verifyTaskProgress(job);
|
||||
|
||||
// TODO later: add explicit "isUber()" checks of some sort (extend
|
||||
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
|
||||
|
@ -173,6 +174,18 @@ public class TestMRJobs {
|
|||
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
|
||||
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
|
||||
}
|
||||
|
||||
protected void verifyTaskProgress(Job job) throws InterruptedException,
|
||||
IOException {
|
||||
for (TaskReport taskReport : job.getTaskReports(TaskType.MAP)) {
|
||||
Assert.assertTrue(0.9999f < taskReport.getProgress()
|
||||
&& 1.0001f > taskReport.getProgress());
|
||||
}
|
||||
for (TaskReport taskReport : job.getTaskReports(TaskType.REDUCE)) {
|
||||
Assert.assertTrue(0.9999f < taskReport.getProgress()
|
||||
&& 1.0001f > taskReport.getProgress());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRandomWriter() throws IOException, InterruptedException,
|
||||
|
@ -198,6 +211,7 @@ public class TestMRJobs {
|
|||
boolean succeeded = job.waitForCompletion(true);
|
||||
Assert.assertTrue(succeeded);
|
||||
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
|
||||
|
||||
// Make sure there are three files in the output-dir
|
||||
|
||||
RemoteIterator<FileStatus> iterator =
|
||||
|
|
Loading…
Reference in New Issue