merge -r 1309400:1309401 from trunk. FIXES: MAPREDUCE-3672
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1309403 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a32553b31c
commit
93af1a43be
|
@ -141,6 +141,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-4020. Web services returns incorrect JSON for deep queue tree
|
MAPREDUCE-4020. Web services returns incorrect JSON for deep queue tree
|
||||||
(Anupam Seth via tgraves)
|
(Anupam Seth via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-3672. Killed maps shouldn't be counted towards
|
||||||
|
JobCounter.NUM_FAILED_MAPS. (Anupam Seth via tgraves)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -988,6 +988,23 @@ public abstract class TaskAttemptImpl implements
|
||||||
}
|
}
|
||||||
return jce;
|
return jce;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
|
||||||
|
TaskAttemptImpl taskAttempt) {
|
||||||
|
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
|
||||||
|
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
|
||||||
|
|
||||||
|
long slotMillisIncrement = computeSlotMillis(taskAttempt);
|
||||||
|
|
||||||
|
if (taskType == TaskType.MAP) {
|
||||||
|
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
|
||||||
|
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
|
||||||
|
} else {
|
||||||
|
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
|
||||||
|
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
|
||||||
|
}
|
||||||
|
return jce;
|
||||||
|
}
|
||||||
|
|
||||||
private static
|
private static
|
||||||
TaskAttemptUnsuccessfulCompletionEvent
|
TaskAttemptUnsuccessfulCompletionEvent
|
||||||
|
@ -1214,8 +1231,13 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||||
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
||||||
finalState);
|
finalState);
|
||||||
taskAttempt.eventHandler
|
if(finalState == TaskAttemptState.FAILED) {
|
||||||
|
taskAttempt.eventHandler
|
||||||
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
|
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
|
||||||
|
} else if(finalState == TaskAttemptState.KILLED) {
|
||||||
|
taskAttempt.eventHandler
|
||||||
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt));
|
||||||
|
}
|
||||||
taskAttempt.eventHandler.handle(new JobHistoryEvent(
|
taskAttempt.eventHandler.handle(new JobHistoryEvent(
|
||||||
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
||||||
} else {
|
} else {
|
||||||
|
@ -1441,7 +1463,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
taskAttempt.setFinishTime();
|
taskAttempt.setFinishTime();
|
||||||
if (taskAttempt.getLaunchTime() != 0) {
|
if (taskAttempt.getLaunchTime() != 0) {
|
||||||
taskAttempt.eventHandler
|
taskAttempt.eventHandler
|
||||||
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt));
|
||||||
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||||
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
||||||
TaskAttemptState.KILLED);
|
TaskAttemptState.KILLED);
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
public enum JobCounter {
|
public enum JobCounter {
|
||||||
NUM_FAILED_MAPS,
|
NUM_FAILED_MAPS,
|
||||||
NUM_FAILED_REDUCES,
|
NUM_FAILED_REDUCES,
|
||||||
|
NUM_KILLED_MAPS,
|
||||||
|
NUM_KILLED_REDUCES,
|
||||||
TOTAL_LAUNCHED_MAPS,
|
TOTAL_LAUNCHED_MAPS,
|
||||||
TOTAL_LAUNCHED_REDUCES,
|
TOTAL_LAUNCHED_REDUCES,
|
||||||
OTHER_LOCAL_MAPS,
|
OTHER_LOCAL_MAPS,
|
||||||
|
|
|
@ -16,6 +16,8 @@ CounterGroupName= Job Counters
|
||||||
|
|
||||||
NUM_FAILED_MAPS.name= Failed map tasks
|
NUM_FAILED_MAPS.name= Failed map tasks
|
||||||
NUM_FAILED_REDUCES.name= Failed reduce tasks
|
NUM_FAILED_REDUCES.name= Failed reduce tasks
|
||||||
|
NUM_KILLED_MAPS.name= Killed map tasks
|
||||||
|
NUM_KILLED_REDUCES.name= Killed reduce tasks
|
||||||
TOTAL_LAUNCHED_MAPS.name= Launched map tasks
|
TOTAL_LAUNCHED_MAPS.name= Launched map tasks
|
||||||
TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks
|
TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks
|
||||||
OTHER_LOCAL_MAPS.name= Other local map tasks
|
OTHER_LOCAL_MAPS.name= Other local map tasks
|
||||||
|
|
|
@ -195,6 +195,8 @@ public class TestJobCleanup {
|
||||||
RunningJob job = jobClient.submitJob(jc);
|
RunningJob job = jobClient.submitJob(jc);
|
||||||
JobID id = job.getID();
|
JobID id = job.getID();
|
||||||
job.waitForCompletion();
|
job.waitForCompletion();
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
assertTrue("No. of failed maps should be 1",counters.getCounter(JobCounter.NUM_FAILED_MAPS) == 1);
|
||||||
|
|
||||||
if (fileName != null) {
|
if (fileName != null) {
|
||||||
Path testFile = new Path(outDir, fileName);
|
Path testFile = new Path(outDir, fileName);
|
||||||
|
@ -240,6 +242,9 @@ public class TestJobCleanup {
|
||||||
job.killJob(); // kill the job
|
job.killJob(); // kill the job
|
||||||
|
|
||||||
job.waitForCompletion(); // wait for the job to complete
|
job.waitForCompletion(); // wait for the job to complete
|
||||||
|
|
||||||
|
counters = job.getCounters();
|
||||||
|
assertTrue("No. of killed maps should be 1", counters.getCounter(JobCounter.NUM_KILLED_MAPS) == 1);
|
||||||
|
|
||||||
if (fileName != null) {
|
if (fileName != null) {
|
||||||
Path testFile = new Path(outDir, fileName);
|
Path testFile = new Path(outDir, fileName);
|
||||||
|
|
|
@ -234,8 +234,10 @@ public class TestSpeculativeExecution {
|
||||||
.getValue());
|
.getValue());
|
||||||
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
|
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
|
||||||
.getValue());
|
.getValue());
|
||||||
Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
|
Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
|
||||||
.getValue());
|
.getValue());
|
||||||
|
Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_KILLED_MAPS)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
/*----------------------------------------------------------------------
|
/*----------------------------------------------------------------------
|
||||||
* Test that Reducer speculates if REDUCE_SPECULATIVE is true and
|
* Test that Reducer speculates if REDUCE_SPECULATIVE is true and
|
||||||
|
|
Loading…
Reference in New Issue