From b775d49f35a2ddb673cce41ce6f3e23731cee301 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Wed, 27 Feb 2013 21:04:29 +0000 Subject: [PATCH] merge MAPREDUCE-4693 from trunk. Historyserver should include counters for failed tasks. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1450957 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 3 +- .../mapreduce/v2/app/job/impl/TaskImpl.java | 3 +- .../src/main/avro/Events.avpr | 4 +- .../jobhistory/JobHistoryParser.java | 2 + ...askAttemptUnsuccessfulCompletionEvent.java | 167 +++++++++++++----- .../mapreduce/jobhistory/TaskFailedEvent.java | 99 ++++++++--- .../v2/hs/TestJobHistoryParsing.java | 5 +- .../apache/hadoop/tools/rumen/JobBuilder.java | 14 +- 9 files changed, 225 insertions(+), 75 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b8bfb4e0ca1..d95289c8d1a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -40,6 +40,9 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER. (Sandy Ryza via tomwhite) + MAPREDUCE-4693. History server should include counters for failed tasks. + (Xuan Gong via sseth) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 8d834c3da7c..62f00b95c0e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1183,7 +1183,8 @@ public abstract class TaskAttemptImpl implements taskAttempt.nodeRackName == null ? "UNKNOWN" : taskAttempt.nodeRackName, StringUtils.join( - LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt + LINE_SEPARATOR, taskAttempt.getDiagnostics()), + taskAttempt.getCounters(), taskAttempt .getProgressSplitBlock().burst()); return tauce; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index f0e120421b1..d01d9998aaf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -730,7 +730,8 @@ public abstract class TaskImpl implements Task, EventHandler { TypeConverter.fromYarn(task.getType()), errorSb.toString(), taskState.toString(), - taId == null ? null : TypeConverter.fromYarn(taId)); + taId == null ? null : TypeConverter.fromYarn(taId), + task.getCounters()); return taskFailedEvent; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr index 716f6e2b639..dcb9ca40ac8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr @@ -212,6 +212,7 @@ {"name": "rackname", "type": "string"}, {"name": "status", "type": "string"}, {"name": "error", "type": "string"}, + {"name": "counters", "type": "JhCounters"}, {"name": "clockSplits", "type": { "type": "array", "items": "int"}}, {"name": "cpuUsages", "type": { "type": "array", "items": "int"}}, {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}}, @@ -226,7 +227,8 @@ {"name": "finishTime", "type": "long"}, {"name": "error", "type": "string"}, {"name": "failedDueToAttempt", "type": ["null", "string"] }, - {"name": "status", "type": "string"} + {"name": "status", "type": "string"}, + {"name": "counters", "type": "JhCounters"} ] }, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index eb8db667e8a..3f8fb545298 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -295,6 +295,7 @@ public class JobHistoryParser implements HistoryEventHandler { attemptInfo.shuffleFinishTime = event.getFinishTime(); attemptInfo.sortFinishTime = event.getFinishTime(); attemptInfo.mapFinishTime = event.getFinishTime(); + attemptInfo.counters = event.getCounters(); if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status)) { //this is a successful task @@ -347,6 +348,7 @@ public class JobHistoryParser implements HistoryEventHandler { taskInfo.finishTime = event.getFinishTime(); taskInfo.error = StringInterner.weakIntern(event.getError()); taskInfo.failedDueToAttemptId = event.getFailedAttemptID(); + taskInfo.counters = event.getCounters(); info.errorInfo = "Task " + taskInfo.taskId +" failed " + taskInfo.attemptsMap.size() + " times "; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java index d2b9354b423..9b5617c01bf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; @@ -36,8 +37,24 @@ import org.apache.avro.util.Utf8; @InterfaceAudience.Private @InterfaceStability.Unstable public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { - private TaskAttemptUnsuccessfulCompletion datum = - new TaskAttemptUnsuccessfulCompletion(); + + private TaskAttemptUnsuccessfulCompletion datum = null; + + private TaskAttemptID attemptId; + private TaskType taskType; + private String status; + private long finishTime; + private String hostname; + private int port; + private String rackName; + private String error; + private Counters counters; + int[][] allSplits; + int[] clockSplits; + int[] cpuUsages; + int[] vMemKbytes; + int[] physMemKbytes; + private static final Counters EMPTY_COUNTERS = new Counters(); /** * Create an event to record the unsuccessful completion of attempts @@ -49,6 +66,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { * @param port rpc port for for the tracker * @param rackName Name of the rack where the attempt executed * @param error Error string + * @param counters Counters for the attempt * @param allSplits the "splits", or a pixelated graph of various * measurable worker node state variables against progress. * Currently there are four; wallclock time, CPU time, @@ -58,31 +76,25 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { (TaskAttemptID id, TaskType taskType, String status, long finishTime, String hostname, int port, String rackName, - String error, int[][] allSplits) { - datum.taskid = new Utf8(id.getTaskID().toString()); - datum.taskType = new Utf8(taskType.name()); - datum.attemptId = new Utf8(id.toString()); - datum.finishTime = finishTime; - datum.hostname = new Utf8(hostname); - if (rackName != null) { - datum.rackname = new Utf8(rackName); - } - datum.port = port; - datum.error = new Utf8(error); - datum.status = new Utf8(status); - - datum.clockSplits - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetWallclockTime(allSplits)); - datum.cpuUsages - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetCPUTime(allSplits)); - datum.vMemKbytes - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits)); - datum.physMemKbytes - = AvroArrayUtils.toAvro - (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits)); + String error, Counters counters, int[][] allSplits) { + this.attemptId = id; + this.taskType = taskType; + this.status = status; + this.finishTime = finishTime; + this.hostname = hostname; + this.port = port; + this.rackName = rackName; + this.error = error; + this.counters = counters; + this.allSplits = allSplits; + this.clockSplits = + ProgressSplitsBlock.arrayGetWallclockTime(allSplits); + this.cpuUsages = + ProgressSplitsBlock.arrayGetCPUTime(allSplits); + this.vMemKbytes = + ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); + this.physMemKbytes = + ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); } /** @@ -103,42 +115,109 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { (TaskAttemptID id, TaskType taskType, String status, long finishTime, String hostname, String error) { - this(id, taskType, status, finishTime, hostname, -1, "", error, null); + this(id, taskType, status, finishTime, hostname, -1, "", + error, EMPTY_COUNTERS, null); + } + + public TaskAttemptUnsuccessfulCompletionEvent + (TaskAttemptID id, TaskType taskType, + String status, long finishTime, + String hostname, int port, String rackName, + String error, int[][] allSplits) { + this(id, taskType, status, finishTime, hostname, port, + rackName, error, EMPTY_COUNTERS, null); } TaskAttemptUnsuccessfulCompletionEvent() {} - public Object getDatum() { return datum; } - public void setDatum(Object datum) { - this.datum = (TaskAttemptUnsuccessfulCompletion)datum; + public Object getDatum() { + if(datum == null) { + datum = new TaskAttemptUnsuccessfulCompletion(); + datum.taskid = new Utf8(attemptId.getTaskID().toString()); + datum.taskType = new Utf8(taskType.name()); + datum.attemptId = new Utf8(attemptId.toString()); + datum.finishTime = finishTime; + datum.hostname = new Utf8(hostname); + if (rackName != null) { + datum.rackname = new Utf8(rackName); + } + datum.port = port; + datum.error = new Utf8(error); + datum.status = new Utf8(status); + + datum.counters = EventWriter.toAvro(counters); + + datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetWallclockTime(allSplits)); + datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetCPUTime(allSplits)); + datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetVMemKbytes(allSplits)); + datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock + .arrayGetPhysMemKbytes(allSplits)); + } + return datum; + } + + + + public void setDatum(Object odatum) { + this.datum = + (TaskAttemptUnsuccessfulCompletion)odatum; + this.attemptId = + TaskAttemptID.forName(datum.attemptId.toString()); + this.taskType = + TaskType.valueOf(datum.taskType.toString()); + this.finishTime = datum.finishTime; + this.hostname = datum.hostname.toString(); + this.rackName = datum.rackname.toString(); + this.port = datum.port; + this.status = datum.status.toString(); + this.error = datum.error.toString(); + this.counters = + EventReader.fromAvro(datum.counters); + this.clockSplits = + AvroArrayUtils.fromAvro(datum.clockSplits); + this.cpuUsages = + AvroArrayUtils.fromAvro(datum.cpuUsages); + this.vMemKbytes = + AvroArrayUtils.fromAvro(datum.vMemKbytes); + this.physMemKbytes = + AvroArrayUtils.fromAvro(datum.physMemKbytes); } /** Get the task id */ - public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } + public TaskID getTaskId() { + return attemptId.getTaskID(); + } /** Get the task type */ public TaskType getTaskType() { - return TaskType.valueOf(datum.taskType.toString()); + return TaskType.valueOf(taskType.toString()); } /** Get the attempt id */ public TaskAttemptID getTaskAttemptId() { - return TaskAttemptID.forName(datum.attemptId.toString()); + return attemptId; } /** Get the finish time */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { return finishTime; } /** Get the name of the host where the attempt executed */ - public String getHostname() { return datum.hostname.toString(); } + public String getHostname() { return hostname; } /** Get the rpc port for the host where the attempt executed */ - public int getPort() { return datum.port; } + public int getPort() { return port; } /** Get the rack name of the node where the attempt ran */ public String getRackName() { - return datum.rackname == null ? null : datum.rackname.toString(); + return rackName == null ? null : rackName.toString(); } /** Get the error string */ - public String getError() { return datum.error.toString(); } + public String getError() { return error.toString(); } /** Get the task status */ - public String getTaskStatus() { return datum.status.toString(); } + public String getTaskStatus() { + return status.toString(); + } + /** Get the counters */ + Counters getCounters() { return counters; } /** Get the event type */ public EventType getEventType() { // Note that the task type can be setup/map/reduce/cleanup but the @@ -157,16 +236,16 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { public int[] getClockSplits() { - return AvroArrayUtils.fromAvro(datum.clockSplits); + return clockSplits; } public int[] getCpuUsages() { - return AvroArrayUtils.fromAvro(datum.cpuUsages); + return cpuUsages; } public int[] getVMemKbytes() { - return AvroArrayUtils.fromAvro(datum.vMemKbytes); + return vMemKbytes; } public int[] getPhysMemKbytes() { - return AvroArrayUtils.fromAvro(datum.physMemKbytes); + return physMemKbytes; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java index 77d72e75e79..0d3aea6bb13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java @@ -18,10 +18,9 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; @@ -35,7 +34,17 @@ import org.apache.avro.util.Utf8; @InterfaceAudience.Private @InterfaceStability.Unstable public class TaskFailedEvent implements HistoryEvent { - private TaskFailed datum = new TaskFailed(); + private TaskFailed datum = null; + + private TaskAttemptID failedDueToAttempt; + private TaskID id; + private TaskType taskType; + private long finishTime; + private String status; + private String error; + private Counters counters; + + private static final Counters EMPTY_COUNTERS = new Counters(); /** * Create an event to record task failure @@ -45,45 +54,87 @@ public class TaskFailedEvent implements HistoryEvent { * @param error Error String * @param status Status * @param failedDueToAttempt The attempt id due to which the task failed + * @param counters Counters for the task */ public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType, String error, String status, - TaskAttemptID failedDueToAttempt) { - datum.taskid = new Utf8(id.toString()); - datum.error = new Utf8(error); - datum.finishTime = finishTime; - datum.taskType = new Utf8(taskType.name()); - datum.failedDueToAttempt = failedDueToAttempt == null - ? null - : new Utf8(failedDueToAttempt.toString()); - datum.status = new Utf8(status); + TaskAttemptID failedDueToAttempt, Counters counters) { + this.id = id; + this.finishTime = finishTime; + this.taskType = taskType; + this.error = error; + this.status = status; + this.failedDueToAttempt = failedDueToAttempt; + this.counters = counters; } + public TaskFailedEvent(TaskID id, long finishTime, + TaskType taskType, String error, String status, + TaskAttemptID failedDueToAttempt) { + this(id, finishTime, taskType, error, status, + failedDueToAttempt, EMPTY_COUNTERS); + } + TaskFailedEvent() {} - public Object getDatum() { return datum; } - public void setDatum(Object datum) { this.datum = (TaskFailed)datum; } + public Object getDatum() { + if(datum == null) { + datum = new TaskFailed(); + datum.taskid = new Utf8(id.toString()); + datum.error = new Utf8(error); + datum.finishTime = finishTime; + datum.taskType = new Utf8(taskType.name()); + datum.failedDueToAttempt = + failedDueToAttempt == null + ? null + : new Utf8(failedDueToAttempt.toString()); + datum.status = new Utf8(status); + datum.counters = EventWriter.toAvro(counters); + } + return datum; + } + + public void setDatum(Object odatum) { + this.datum = (TaskFailed)odatum; + this.id = + TaskID.forName(datum.taskid.toString()); + this.taskType = + TaskType.valueOf(datum.taskType.toString()); + this.finishTime = datum.finishTime; + this.error = datum.error.toString(); + this.failedDueToAttempt = + datum.failedDueToAttempt == null + ? null + : TaskAttemptID.forName( + datum.failedDueToAttempt.toString()); + this.status = datum.status.toString(); + this.counters = + EventReader.fromAvro(datum.counters); + } /** Get the task id */ - public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } + public TaskID getTaskId() { return id; } /** Get the error string */ - public String getError() { return datum.error.toString(); } + public String getError() { return error; } /** Get the finish time of the attempt */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { + return finishTime; + } /** Get the task type */ public TaskType getTaskType() { - return TaskType.valueOf(datum.taskType.toString()); + return taskType; } /** Get the attempt id due to which the task failed */ public TaskAttemptID getFailedAttemptID() { - return datum.failedDueToAttempt == null - ? null - : TaskAttemptID.forName(datum.failedDueToAttempt.toString()); + return failedDueToAttempt; } /** Get the task status */ - public String getTaskStatus() { return datum.status.toString(); } + public String getTaskStatus() { return status; } + /** Get task counters */ + public Counters getCounters() { return counters; } /** Get the event type */ - public EventType getEventType() { return EventType.TASK_FAILED; } + public EventType getEventType() { + return EventType.TASK_FAILED; + } - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index e24cf05d790..2eb6aaa133a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -404,7 +404,7 @@ public class TestJobHistoryParsing { } } - @Test + @Test (timeout=5000) public void testCountersForFailedTask() throws Exception { LOG.info("STARTING testCountersForFailedTask"); try { @@ -455,6 +455,9 @@ public class TestJobHistoryParsing { CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); Assert.assertNotNull("completed task report has null counters", ct.getReport().getCounters()); + //Make sure all the completedTask has counters, and the counters are not empty + Assert.assertTrue(ct.getReport().getCounters() + .getAllCounterGroups().size() > 0); } } finally { LOG.info("FINISHED testCountersForFailedTask"); diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/JobBuilder.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/JobBuilder.java index 479e62e5518..2bc5062f9a5 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/JobBuilder.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/JobBuilder.java @@ -83,6 +83,9 @@ public class JobBuilder { private Map allHosts = new HashMap(); + private org.apache.hadoop.mapreduce.jobhistory.JhCounters EMPTY_COUNTERS = + new org.apache.hadoop.mapreduce.jobhistory.JhCounters(); + /** * The number of splits a task can have, before we ignore them all. */ @@ -459,7 +462,10 @@ public class JobBuilder { TaskFailed t = (TaskFailed)(event.getDatum()); task.putDiagnosticInfo(t.error.toString()); task.putFailedDueToAttemptId(t.failedDueToAttempt.toString()); - // No counters in TaskFailedEvent + org.apache.hadoop.mapreduce.jobhistory.JhCounters counters = + ((TaskFailed) event.getDatum()).counters; + task.incorporateCounters( + counters == null ? EMPTY_COUNTERS : counters); } private void processTaskAttemptUnsuccessfulCompletionEvent( @@ -481,7 +487,10 @@ public class JobBuilder { } attempt.setFinishTime(event.getFinishTime()); - + org.apache.hadoop.mapreduce.jobhistory.JhCounters counters = + ((TaskAttemptUnsuccessfulCompletion) event.getDatum()).counters; + attempt.incorporateCounters( + counters == null ? EMPTY_COUNTERS : counters); attempt.arraySetClockSplits(event.getClockSplits()); attempt.arraySetCpuUsages(event.getCpuUsages()); attempt.arraySetVMemKbytes(event.getVMemKbytes()); @@ -489,7 +498,6 @@ public class JobBuilder { TaskAttemptUnsuccessfulCompletion t = (TaskAttemptUnsuccessfulCompletion) (event.getDatum()); attempt.putDiagnosticInfo(t.error.toString()); - // No counters in TaskAttemptUnsuccessfulCompletionEvent } private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {