diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index da28bc607de..a6031c00ab8 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -313,6 +313,9 @@ Release 2.9.0 - UNRELEASED
MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
mapreduce. (Sidharta Seethana via vvasudev)
+ MAPREDUCE-6627. Add machine-readable output to mapred job -history
+ command (rkanter)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
index 61f3c533b81..f0952bc1ac8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
@@ -42,6 +42,11 @@
hadoop-hdfs
test
+
+ org.skyscreamer
+ jsonassert
+ test
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
index f343d7c2e2d..e5db2e5a1c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
@@ -19,13 +19,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.text.DecimalFormat;
-import java.text.Format;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Comparator;
+import java.io.PrintStream;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -37,46 +32,52 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.TaskStatus;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.util.HostUtil;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
/**
- * HistoryViewer is used to parse and view the JobHistory files
- *
+ * HistoryViewer is used to parse and view the JobHistory files. They can be
+ * printed in human-readable format or machine-readable JSON format using the
+ * {@link HistoryViewerPrinter}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HistoryViewer {
private static final Log LOG = LogFactory.getLog(HistoryViewer.class);
- private static final SimpleDateFormat dateFormat =
- new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
private FileSystem fs;
private JobInfo job;
- private String jobId;
- private boolean printAll;
+ private HistoryViewerPrinter jhvp;
+ public static final String HUMAN_FORMAT = "human";
+ public static final String JSON_FORMAT = "json";
-/**
- * Constructs the HistoryViewer object
- * @param historyFile The fully qualified Path of the History File
- * @param conf The Configuration file
- * @param printAll Toggle to print all status to only killed/failed status
- * @throws IOException
- */
- public HistoryViewer(String historyFile,
- Configuration conf,
+ /**
+ * Constructs the HistoryViewer object.
+ * @param historyFile the fully qualified Path of the History File
+ * @param conf the Configuration file
+ * @param printAll toggle to print all status to only killed/failed status
+ * @throws IOException when there is a problem parsing the history file
+ */
+ public HistoryViewer(String historyFile, Configuration conf,
boolean printAll) throws IOException {
- this.printAll = printAll;
+ this(historyFile, conf, printAll, HUMAN_FORMAT);
+ }
+
+ /**
+ * Constructs the HistoryViewer object.
+ * @param historyFile the fully qualified Path of the History File
+ * @param conf the Configuration file
+ * @param printAll toggle to print all status to only killed/failed status
+ * @param format the output format to use
+ * @throws IOException when there is a problem parsing the history file
+ */
+ public HistoryViewer(String historyFile, Configuration conf, boolean printAll,
+ String format) throws IOException {
String errorMsg = "Unable to initialize History Viewer";
try {
Path jobFile = new Path(historyFile);
@@ -101,366 +102,41 @@ public class HistoryViewer {
}
JobHistoryParser parser = new JobHistoryParser(fs, jobFile);
job = parser.parse();
- jobId = job.getJobId().toString();
- } catch(Exception e) {
+ String scheme = WebAppUtils.getHttpSchemePrefix(fs.getConf());
+ if (HUMAN_FORMAT.equalsIgnoreCase(format)) {
+ jhvp = new HumanReadableHistoryViewerPrinter(job, printAll, scheme);
+ } else if (JSON_FORMAT.equalsIgnoreCase(format)) {
+ jhvp = new JSONHistoryViewerPrinter(job, printAll, scheme);
+ } else {
+ System.err.println("Invalid format specified: " + format);
+ throw new IllegalArgumentException(errorMsg);
+ }
+ } catch(IOException e) {
throw new IOException(errorMsg, e);
}
}
/**
- * Print the job/task/attempt summary information
- * @throws IOException
+ * Print the job/task/attempt summary information to stdout.
+ * @throws IOException when there is a problem printing the history
*/
- public void print() throws IOException{
- printJobDetails();
- printTaskSummary();
- printJobAnalysis();
- printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
- printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
- printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
- printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
- printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
- printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
- printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
- printTasks(TaskType.JOB_CLEANUP,
- JobStatus.getJobRunState(JobStatus.KILLED));
- if (printAll) {
- printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
- printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
- printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
- printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
- printAllTaskAttempts(TaskType.JOB_SETUP);
- printAllTaskAttempts(TaskType.MAP);
- printAllTaskAttempts(TaskType.REDUCE);
- printAllTaskAttempts(TaskType.JOB_CLEANUP);
- }
-
- FilteredJob filter = new FilteredJob(job,
- TaskStatus.State.FAILED.toString());
- printFailedAttempts(filter);
-
- filter = new FilteredJob(job,
- TaskStatus.State.KILLED.toString());
- printFailedAttempts(filter);
- }
-
- private void printJobDetails() {
- StringBuffer jobDetails = new StringBuffer();
- jobDetails.append("\nHadoop job: " ).append(job.getJobId());
- jobDetails.append("\n=====================================");
- jobDetails.append("\nUser: ").append(job.getUsername());
- jobDetails.append("\nJobName: ").append(job.getJobname());
- jobDetails.append("\nJobConf: ").append(job.getJobConfPath());
- jobDetails.append("\nSubmitted At: ").append(StringUtils.
- getFormattedTimeWithDiff(dateFormat,
- job.getSubmitTime(), 0));
- jobDetails.append("\nLaunched At: ").append(StringUtils.
- getFormattedTimeWithDiff(dateFormat,
- job.getLaunchTime(),
- job.getSubmitTime()));
- jobDetails.append("\nFinished At: ").append(StringUtils.
- getFormattedTimeWithDiff(dateFormat,
- job.getFinishTime(),
- job.getLaunchTime()));
- jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ?
- "Incomplete" :job.getJobStatus()));
- printCounters(jobDetails, job.getTotalCounters(), job.getMapCounters(),
- job.getReduceCounters());
- jobDetails.append("\n");
- jobDetails.append("\n=====================================");
- System.out.println(jobDetails.toString());
+ public void print() throws IOException {
+ print(System.out);
}
- private void printCounters(StringBuffer buff, Counters totalCounters,
- Counters mapCounters, Counters reduceCounters) {
- // Killed jobs might not have counters
- if (totalCounters == null) {
- return;
- }
- buff.append("\nCounters: \n\n");
- buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
- "Group Name",
- "Counter name",
- "Map Value",
- "Reduce Value",
- "Total Value"));
- buff.append("\n------------------------------------------"+
- "---------------------------------------------");
- for (String groupName : totalCounters.getGroupNames()) {
- CounterGroup totalGroup = totalCounters.getGroup(groupName);
- CounterGroup mapGroup = mapCounters.getGroup(groupName);
- CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
-
- Format decimal = new DecimalFormat();
- Iterator ctrItr =
- totalGroup.iterator();
- while(ctrItr.hasNext()) {
- org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
- String name = counter.getName();
- String mapValue =
- decimal.format(mapGroup.findCounter(name).getValue());
- String reduceValue =
- decimal.format(reduceGroup.findCounter(name).getValue());
- String totalValue =
- decimal.format(counter.getValue());
-
- buff.append(
- String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
- totalGroup.getDisplayName(),
- counter.getDisplayName(),
- mapValue, reduceValue, totalValue));
- }
- }
- }
-
- private void printAllTaskAttempts(TaskType taskType) {
- Map tasks = job.getAllTasks();
- StringBuffer taskList = new StringBuffer();
- taskList.append("\n").append(taskType);
- taskList.append(" task list for ").append(job.getJobId());
- taskList.append("\nTaskId\t\tStartTime");
- if (TaskType.REDUCE.equals(taskType)) {
- taskList.append("\tShuffleFinished\tSortFinished");
- }
- taskList.append("\tFinishTime\tHostName\tError\tTaskLogs");
- taskList.append("\n====================================================");
- System.out.println(taskList.toString());
- for (JobHistoryParser.TaskInfo task : tasks.values()) {
- for (JobHistoryParser.TaskAttemptInfo attempt :
- task.getAllTaskAttempts().values()) {
- if (taskType.equals(task.getTaskType())){
- taskList.setLength(0);
- taskList.append(attempt.getAttemptId()).append("\t");
- taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
- attempt.getStartTime(), 0)).append("\t");
- if (TaskType.REDUCE.equals(taskType)) {
- taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
- attempt.getShuffleFinishTime(),
- attempt.getStartTime()));
- taskList.append("\t");
- taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
- attempt.getSortFinishTime(),
- attempt.getShuffleFinishTime()));
- }
- taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
- attempt.getFinishTime(),
- attempt.getStartTime()));
- taskList.append("\t");
- taskList.append(attempt.getHostname()).append("\t");
- taskList.append(attempt.getError());
- String taskLogsUrl = getTaskLogsUrl(
- WebAppUtils.getHttpSchemePrefix(fs.getConf()), attempt);
- taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a");
- System.out.println(taskList.toString());
- }
- }
- }
- }
-
- private void printTaskSummary() {
- SummarizedJob ts = new SummarizedJob(job);
- StringBuffer taskSummary = new StringBuffer();
- taskSummary.append("\nTask Summary");
- taskSummary.append("\n============================");
- taskSummary.append("\nKind\tTotal\t");
- taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
- taskSummary.append("\n");
- taskSummary.append("\nSetup\t").append(ts.totalSetups);
- taskSummary.append("\t").append(ts.numFinishedSetups);
- taskSummary.append("\t\t").append(ts.numFailedSetups);
- taskSummary.append("\t").append(ts.numKilledSetups);
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.setupStarted, 0));
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.setupFinished, ts.setupStarted));
- taskSummary.append("\nMap\t").append(ts.totalMaps);
- taskSummary.append("\t").append(job.getFinishedMaps());
- taskSummary.append("\t\t").append(ts.numFailedMaps);
- taskSummary.append("\t").append(ts.numKilledMaps);
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.mapStarted, 0));
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.mapFinished, ts.mapStarted));
- taskSummary.append("\nReduce\t").append(ts.totalReduces);
- taskSummary.append("\t").append(job.getFinishedReduces());
- taskSummary.append("\t\t").append(ts.numFailedReduces);
- taskSummary.append("\t").append(ts.numKilledReduces);
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.reduceStarted, 0));
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.reduceFinished, ts.reduceStarted));
- taskSummary.append("\nCleanup\t").append(ts.totalCleanups);
- taskSummary.append("\t").append(ts.numFinishedCleanups);
- taskSummary.append("\t\t").append(ts.numFailedCleanups);
- taskSummary.append("\t").append(ts.numKilledCleanups);
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.cleanupStarted, 0));
- taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, ts.cleanupFinished,
- ts.cleanupStarted));
- taskSummary.append("\n============================\n");
- System.out.println(taskSummary.toString());
- }
-
- private void printJobAnalysis() {
- if (!job.getJobStatus().equals
- (JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
- System.out.println("No Analysis available as job did not finish");
- return;
- }
-
- AnalyzedJob avg = new AnalyzedJob(job);
-
- System.out.println("\nAnalysis");
- System.out.println("=========");
- printAnalysis(avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), 10);
- printLast(avg.getMapTasks(), "map", cFinishMapRed);
-
- if (avg.getReduceTasks().length > 0) {
- printAnalysis(avg.getReduceTasks(), cShuffle, "shuffle",
- avg.getAvgShuffleTime(), 10);
- printLast(avg.getReduceTasks(), "shuffle", cFinishShuffle);
-
- printAnalysis(avg.getReduceTasks(), cReduce, "reduce",
- avg.getAvgReduceTime(), 10);
- printLast(avg.getReduceTasks(), "reduce", cFinishMapRed);
- }
- System.out.println("=========");
- }
-
- private void printAnalysis(JobHistoryParser.TaskAttemptInfo [] tasks,
- Comparator cmp,
- String taskType,
- long avg,
- int showTasks) {
- Arrays.sort(tasks, cmp);
- JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1];
- StringBuffer details = new StringBuffer();
- details.append("\nTime taken by best performing ");
- details.append(taskType).append(" task ");
- details.append(min.getAttemptId().getTaskID().toString()).append(": ");
- if ("map".equals(taskType)) {
- details.append(StringUtils.formatTimeDiff(
- min.getFinishTime(),
- min.getStartTime()));
- } else if ("shuffle".equals(taskType)) {
- details.append(StringUtils.formatTimeDiff(
- min.getShuffleFinishTime(),
- min.getStartTime()));
- } else {
- details.append(StringUtils.formatTimeDiff(
- min.getFinishTime(),
- min.getShuffleFinishTime()));
- }
- details.append("\nAverage time taken by ");
- details.append(taskType).append(" tasks: ");
- details.append(StringUtils.formatTimeDiff(avg, 0));
- details.append("\nWorse performing ");
- details.append(taskType).append(" tasks: ");
- details.append("\nTaskId\t\tTimetaken");
- System.out.println(details.toString());
- for (int i = 0; i < showTasks && i < tasks.length; i++) {
- details.setLength(0);
- details.append(tasks[i].getAttemptId().getTaskID()).append(" ");
- if ("map".equals(taskType)) {
- details.append(StringUtils.formatTimeDiff(
- tasks[i].getFinishTime(),
- tasks[i].getStartTime()));
- } else if ("shuffle".equals(taskType)) {
- details.append(StringUtils.formatTimeDiff(
- tasks[i].getShuffleFinishTime(),
- tasks[i].getStartTime()));
- } else {
- details.append(StringUtils.formatTimeDiff(
- tasks[i].getFinishTime(),
- tasks[i].getShuffleFinishTime()));
- }
- System.out.println(details.toString());
- }
- }
-
- private void printLast(JobHistoryParser.TaskAttemptInfo [] tasks,
- String taskType,
- Comparator cmp
- ) {
- Arrays.sort(tasks, cFinishMapRed);
- JobHistoryParser.TaskAttemptInfo last = tasks[0];
- StringBuffer lastBuf = new StringBuffer();
- lastBuf.append("The last ").append(taskType);
- lastBuf.append(" task ").append(last.getAttemptId().getTaskID());
- Long finishTime;
- if ("shuffle".equals(taskType)) {
- finishTime = last.getShuffleFinishTime();
- } else {
- finishTime = last.getFinishTime();
- }
- lastBuf.append(" finished at (relative to the Job launch time): ");
- lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
- finishTime, job.getLaunchTime()));
- System.out.println(lastBuf.toString());
- }
-
- private void printTasks(TaskType taskType, String status) {
- Map tasks = job.getAllTasks();
- StringBuffer header = new StringBuffer();
- header.append("\n").append(status).append(" ");
- header.append(taskType).append(" task list for ").append(jobId);
- header.append("\nTaskId\t\tStartTime\tFinishTime\tError");
- if (TaskType.MAP.equals(taskType)) {
- header.append("\tInputSplits");
- }
- header.append("\n====================================================");
- StringBuffer taskList = new StringBuffer();
- for (JobHistoryParser.TaskInfo task : tasks.values()) {
- if (taskType.equals(task.getTaskType()) &&
- (status.equals(task.getTaskStatus())
- || status.equalsIgnoreCase("ALL"))) {
- taskList.setLength(0);
- taskList.append(task.getTaskId());
- taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, task.getStartTime(), 0));
- taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
- dateFormat, task.getFinishTime(),
- task.getStartTime()));
- taskList.append("\t").append(task.getError());
- if (TaskType.MAP.equals(taskType)) {
- taskList.append("\t").append(task.getSplitLocations());
- }
- if (taskList != null) {
- System.out.println(header.toString());
- System.out.println(taskList.toString());
- }
- }
- }
- }
-
- private void printFailedAttempts(FilteredJob filteredJob) {
- Map> badNodes = filteredJob.getFilteredMap();
- StringBuffer attempts = new StringBuffer();
- if (badNodes.size() > 0) {
- attempts.append("\n").append(filteredJob.getFilter());
- attempts.append(" task attempts by nodes");
- attempts.append("\nHostname\tFailedTasks");
- attempts.append("\n===============================");
- System.out.println(attempts.toString());
- for (Map.Entry> entry : badNodes.entrySet()) {
- String node = entry.getKey();
- Set failedTasks = entry.getValue();
- attempts.setLength(0);
- attempts.append(node).append("\t");
- for (TaskID t : failedTasks) {
- attempts.append(t).append(", ");
- }
- System.out.println(attempts.toString());
- }
- }
+ /**
+ * Print the job/task/attempt summary information to the PrintStream.
+ * @param ps The PrintStream to print to
+ * @throws IOException when there is a problem printing the history
+ */
+ public void print(PrintStream ps) throws IOException {
+ jhvp.print(ps);
}
/**
- * Return the TaskLogsUrl of a particular TaskAttempt
+ * Return the TaskLogsUrl of a particular TaskAttempt.
*
- * @param attempt
+ * @param attempt info about the task attempt
* @return the taskLogsUrl. null if http-port or tracker-name or
* task-attempt-id are unavailable.
*/
@@ -480,58 +156,6 @@ public class HistoryViewer {
attempt.getAttemptId().toString());
}
- private Comparator cMap =
- new Comparator() {
- public int compare(JobHistoryParser.TaskAttemptInfo t1,
- JobHistoryParser.TaskAttemptInfo t2) {
- long l1 = t1.getFinishTime() - t1.getStartTime();
- long l2 = t2.getFinishTime() - t2.getStartTime();
- return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
- }
- };
-
- private Comparator cShuffle =
- new Comparator() {
- public int compare(JobHistoryParser.TaskAttemptInfo t1,
- JobHistoryParser.TaskAttemptInfo t2) {
- long l1 = t1.getShuffleFinishTime() - t1.getStartTime();
- long l2 = t2.getShuffleFinishTime() - t2.getStartTime();
- return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
- }
- };
-
- private Comparator cFinishShuffle =
- new Comparator() {
- public int compare(JobHistoryParser.TaskAttemptInfo t1,
- JobHistoryParser.TaskAttemptInfo t2) {
- long l1 = t1.getShuffleFinishTime();
- long l2 = t2.getShuffleFinishTime();
- return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
- }
- };
-
- private Comparator cFinishMapRed =
- new Comparator() {
- public int compare(JobHistoryParser.TaskAttemptInfo t1,
- JobHistoryParser.TaskAttemptInfo t2) {
- long l1 = t1.getFinishTime();
- long l2 = t2.getFinishTime();
- return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
- }
- };
-
- private Comparator cReduce =
- new Comparator() {
- public int compare(JobHistoryParser.TaskAttemptInfo t1,
- JobHistoryParser.TaskAttemptInfo t2) {
- long l1 = t1.getFinishTime() -
- t1.getShuffleFinishTime();
- long l2 = t2.getFinishTime() -
- t2.getShuffleFinishTime();
- return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
- }
- };
-
/**
* Utility class used the summarize the job.
* Used by HistoryViewer and the JobHistory UI.
@@ -694,7 +318,6 @@ public class HistoryViewer {
* Utility class used while analyzing the job.
* Used by HistoryViewer and the JobHistory UI.
*/
-
public static class AnalyzedJob {
private long avgMapTime;
private long avgReduceTime;
@@ -765,7 +388,6 @@ public class HistoryViewer {
/**
* Utility to filter out events based on the task status
- *
*/
public static class FilteredJob {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewerPrinter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewerPrinter.java
new file mode 100644
index 00000000000..406b4391c83
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewerPrinter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Used by the {@link HistoryViewer} to print job history in different formats.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+interface HistoryViewerPrinter {
+
+ /**
+ * Print out the Job History to the given {@link PrintStream}.
+ * @param ps the {@link PrintStream} to print to
+ * @throws IOException when a problem occurs while printing
+ */
+ void print(PrintStream ps) throws IOException;
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java
new file mode 100644
index 00000000000..30903afd7d2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+/**
+ * Used by the {@link HistoryViewer} to print job history in a human-readable
+ * format.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class HumanReadableHistoryViewerPrinter implements HistoryViewerPrinter {
+
+ private JobHistoryParser.JobInfo job;
+ private final SimpleDateFormat dateFormat;
+ private boolean printAll;
+ private String scheme;
+
+ HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job,
+ boolean printAll, String scheme) {
+ this.job = job;
+ this.printAll = printAll;
+ this.scheme = scheme;
+ this.dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
+ }
+
+ HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job,
+ boolean printAll, String scheme,
+ TimeZone tz) {
+ this(job, printAll, scheme);
+ this.dateFormat.setTimeZone(tz);
+ }
+
+ /**
+ * Print out the Job History to the given {@link PrintStream} in a
+ * human-readable format.
+ * @param ps the {@link PrintStream} to print to
+ * @throws IOException when a problem occurs while printing
+ */
+ @Override
+ public void print(PrintStream ps) throws IOException {
+ printJobDetails(ps);
+ printTaskSummary(ps);
+ printJobAnalysis(ps);
+ printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
+ printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
+ printTasks(ps, TaskType.MAP, TaskStatus.State.FAILED.toString());
+ printTasks(ps, TaskType.MAP, TaskStatus.State.KILLED.toString());
+ printTasks(ps, TaskType.REDUCE, TaskStatus.State.FAILED.toString());
+ printTasks(ps, TaskType.REDUCE, TaskStatus.State.KILLED.toString());
+ printTasks(ps, TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
+ printTasks(ps, TaskType.JOB_CLEANUP,
+ JobStatus.getJobRunState(JobStatus.KILLED));
+ if (printAll) {
+ printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
+ printTasks(ps, TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
+ printTasks(ps, TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
+ printTasks(ps, TaskType.JOB_CLEANUP,
+ TaskStatus.State.SUCCEEDED.toString());
+ printAllTaskAttempts(ps, TaskType.JOB_SETUP);
+ printAllTaskAttempts(ps, TaskType.MAP);
+ printAllTaskAttempts(ps, TaskType.REDUCE);
+ printAllTaskAttempts(ps, TaskType.JOB_CLEANUP);
+ }
+
+ HistoryViewer.FilteredJob filter = new HistoryViewer.FilteredJob(job,
+ TaskStatus.State.FAILED.toString());
+ printFailedAttempts(ps, filter);
+
+ filter = new HistoryViewer.FilteredJob(job,
+ TaskStatus.State.KILLED.toString());
+ printFailedAttempts(ps, filter);
+ }
+
+ private void printJobDetails(PrintStream ps) {
+ StringBuilder jobDetails = new StringBuilder();
+ jobDetails.append("\nHadoop job: ").append(job.getJobId());
+ jobDetails.append("\n=====================================");
+ jobDetails.append("\nUser: ").append(job.getUsername());
+ jobDetails.append("\nJobName: ").append(job.getJobname());
+ jobDetails.append("\nJobConf: ").append(job.getJobConfPath());
+ jobDetails.append("\nSubmitted At: ").append(StringUtils.
+ getFormattedTimeWithDiff(dateFormat,
+ job.getSubmitTime(), 0));
+ jobDetails.append("\nLaunched At: ").append(StringUtils.
+ getFormattedTimeWithDiff(dateFormat,
+ job.getLaunchTime(),
+ job.getSubmitTime()));
+ jobDetails.append("\nFinished At: ").append(StringUtils.
+ getFormattedTimeWithDiff(dateFormat,
+ job.getFinishTime(),
+ job.getLaunchTime()));
+ jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ?
+ "Incomplete" :job.getJobStatus()));
+ printJobCounters(jobDetails, job.getTotalCounters(), job.getMapCounters(),
+ job.getReduceCounters());
+ jobDetails.append("\n");
+ jobDetails.append("\n=====================================");
+ ps.println(jobDetails);
+ }
+
+ private void printJobCounters(StringBuilder buff, Counters totalCounters,
+ Counters mapCounters, Counters reduceCounters) {
+ // Killed jobs might not have counters
+ if (totalCounters != null) {
+ buff.append("\nCounters: \n\n");
+ buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
+ "Group Name",
+ "Counter name",
+ "Map Value",
+ "Reduce Value",
+ "Total Value"));
+ buff.append("\n------------------------------------------" +
+ "---------------------------------------------");
+ for (String groupName : totalCounters.getGroupNames()) {
+ CounterGroup totalGroup = totalCounters.getGroup(groupName);
+ CounterGroup mapGroup = mapCounters.getGroup(groupName);
+ CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
+
+ Format decimal = new DecimalFormat();
+ Iterator ctrItr =
+ totalGroup.iterator();
+ while (ctrItr.hasNext()) {
+ org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
+ String name = counter.getName();
+ String mapValue =
+ decimal.format(mapGroup.findCounter(name).getValue());
+ String reduceValue =
+ decimal.format(reduceGroup.findCounter(name).getValue());
+ String totalValue =
+ decimal.format(counter.getValue());
+
+ buff.append(
+ String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
+ totalGroup.getDisplayName(),
+ counter.getDisplayName(),
+ mapValue, reduceValue, totalValue));
+ }
+ }
+ }
+ }
+
+ private void printAllTaskAttempts(PrintStream ps, TaskType taskType) {
+ Map tasks = job.getAllTasks();
+ StringBuilder taskList = new StringBuilder();
+ taskList.append("\n").append(taskType);
+ taskList.append(" task list for ").append(job.getJobId());
+ taskList.append("\nTaskId\t\tStartTime");
+ if (TaskType.REDUCE.equals(taskType)) {
+ taskList.append("\tShuffleFinished\tSortFinished");
+ }
+ taskList.append("\tFinishTime\tHostName\tError\tTaskLogs");
+ taskList.append("\n====================================================");
+ ps.println(taskList.toString());
+ for (JobHistoryParser.TaskInfo task : tasks.values()) {
+ for (JobHistoryParser.TaskAttemptInfo attempt :
+ task.getAllTaskAttempts().values()) {
+ if (taskType.equals(task.getTaskType())){
+ taskList.setLength(0);
+ taskList.append(attempt.getAttemptId()).append("\t");
+ taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+ attempt.getStartTime(), 0)).append("\t");
+ if (TaskType.REDUCE.equals(taskType)) {
+ taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+ attempt.getShuffleFinishTime(),
+ attempt.getStartTime()));
+ taskList.append("\t");
+ taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+ attempt.getSortFinishTime(),
+ attempt.getShuffleFinishTime()));
+ }
+ taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+ attempt.getFinishTime(),
+ attempt.getStartTime()));
+ taskList.append("\t");
+ taskList.append(attempt.getHostname()).append("\t");
+ taskList.append(attempt.getError());
+ String taskLogsUrl = HistoryViewer.getTaskLogsUrl(scheme, attempt);
+ taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a");
+ ps.println(taskList);
+ }
+ }
+ }
+ }
+
+ private void printTaskSummary(PrintStream ps) {
+ HistoryViewer.SummarizedJob ts = new HistoryViewer.SummarizedJob(job);
+ StringBuilder taskSummary = new StringBuilder();
+ taskSummary.append("\nTask Summary");
+ taskSummary.append("\n============================");
+ taskSummary.append("\nKind\tTotal\t");
+ taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
+ taskSummary.append("\n");
+ taskSummary.append("\nSetup\t").append(ts.totalSetups);
+ taskSummary.append("\t").append(ts.numFinishedSetups);
+ taskSummary.append("\t\t").append(ts.numFailedSetups);
+ taskSummary.append("\t").append(ts.numKilledSetups);
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.setupStarted, 0));
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.setupFinished, ts.setupStarted));
+ taskSummary.append("\nMap\t").append(ts.totalMaps);
+ taskSummary.append("\t").append(job.getFinishedMaps());
+ taskSummary.append("\t\t").append(ts.numFailedMaps);
+ taskSummary.append("\t").append(ts.numKilledMaps);
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.mapStarted, 0));
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.mapFinished, ts.mapStarted));
+ taskSummary.append("\nReduce\t").append(ts.totalReduces);
+ taskSummary.append("\t").append(job.getFinishedReduces());
+ taskSummary.append("\t\t").append(ts.numFailedReduces);
+ taskSummary.append("\t").append(ts.numKilledReduces);
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.reduceStarted, 0));
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.reduceFinished, ts.reduceStarted));
+ taskSummary.append("\nCleanup\t").append(ts.totalCleanups);
+ taskSummary.append("\t").append(ts.numFinishedCleanups);
+ taskSummary.append("\t\t").append(ts.numFailedCleanups);
+ taskSummary.append("\t").append(ts.numKilledCleanups);
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.cleanupStarted, 0));
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, ts.cleanupFinished,
+ ts.cleanupStarted));
+ taskSummary.append("\n============================\n");
+ ps.println(taskSummary);
+ }
+
+ private void printJobAnalysis(PrintStream ps) {
+ if (job.getJobStatus().equals(
+ JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
+ HistoryViewer.AnalyzedJob avg = new HistoryViewer.AnalyzedJob(job);
+
+ ps.println("\nAnalysis");
+ ps.println("=========");
+ printAnalysis(ps, avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(),
+ 10);
+ printLast(ps, avg.getMapTasks(), "map", cFinishMapRed);
+
+ if (avg.getReduceTasks().length > 0) {
+ printAnalysis(ps, avg.getReduceTasks(), cShuffle, "shuffle",
+ avg.getAvgShuffleTime(), 10);
+ printLast(ps, avg.getReduceTasks(), "shuffle", cFinishShuffle);
+
+ printAnalysis(ps, avg.getReduceTasks(), cReduce, "reduce",
+ avg.getAvgReduceTime(), 10);
+ printLast(ps, avg.getReduceTasks(), "reduce", cFinishMapRed);
+ }
+ ps.println("=========");
+ } else {
+ ps.println("No Analysis available as job did not finish");
+ }
+ }
+
+ protected void printAnalysis(PrintStream ps,
+ JobHistoryParser.TaskAttemptInfo[] tasks,
+ Comparator cmp,
+ String taskType, long avg, int showTasks) {
+ Arrays.sort(tasks, cmp);
+ JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1];
+ StringBuilder details = new StringBuilder();
+ details.append("\nTime taken by best performing ");
+ details.append(taskType).append(" task ");
+ details.append(min.getAttemptId().getTaskID().toString()).append(": ");
+ if ("map".equals(taskType)) {
+ details.append(StringUtils.formatTimeDiff(
+ min.getFinishTime(),
+ min.getStartTime()));
+ } else if ("shuffle".equals(taskType)) {
+ details.append(StringUtils.formatTimeDiff(
+ min.getShuffleFinishTime(),
+ min.getStartTime()));
+ } else {
+ details.append(StringUtils.formatTimeDiff(
+ min.getFinishTime(),
+ min.getShuffleFinishTime()));
+ }
+ details.append("\nAverage time taken by ");
+ details.append(taskType).append(" tasks: ");
+ details.append(StringUtils.formatTimeDiff(avg, 0));
+ details.append("\nWorse performing ");
+ details.append(taskType).append(" tasks: ");
+ details.append("\nTaskId\t\tTimetaken");
+ ps.println(details);
+ for (int i = 0; i < showTasks && i < tasks.length; i++) {
+ details.setLength(0);
+ details.append(tasks[i].getAttemptId().getTaskID()).append(" ");
+ if ("map".equals(taskType)) {
+ details.append(StringUtils.formatTimeDiff(
+ tasks[i].getFinishTime(),
+ tasks[i].getStartTime()));
+ } else if ("shuffle".equals(taskType)) {
+ details.append(StringUtils.formatTimeDiff(
+ tasks[i].getShuffleFinishTime(),
+ tasks[i].getStartTime()));
+ } else {
+ details.append(StringUtils.formatTimeDiff(
+ tasks[i].getFinishTime(),
+ tasks[i].getShuffleFinishTime()));
+ }
+ ps.println(details);
+ }
+ }
+
+ protected void printLast(PrintStream ps,
+ JobHistoryParser.TaskAttemptInfo[] tasks, String taskType,
+ Comparator cmp) {
+ Arrays.sort(tasks, cFinishMapRed);
+ JobHistoryParser.TaskAttemptInfo last = tasks[0];
+ StringBuilder lastBuf = new StringBuilder();
+ lastBuf.append("The last ").append(taskType);
+ lastBuf.append(" task ").append(last.getAttemptId().getTaskID());
+ Long finishTime;
+ if ("shuffle".equals(taskType)) {
+ finishTime = last.getShuffleFinishTime();
+ } else {
+ finishTime = last.getFinishTime();
+ }
+ lastBuf.append(" finished at (relative to the Job launch time): ");
+ lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+ finishTime, job.getLaunchTime()));
+ ps.println(lastBuf);
+ }
+
+ private void printTasks(PrintStream ps, TaskType taskType, String status) {
+ Map tasks = job.getAllTasks();
+ StringBuilder header = new StringBuilder();
+ header.append("\n").append(status).append(" ");
+ header.append(taskType).append(" task list for ")
+ .append(job.getJobId().toString());
+ header.append("\nTaskId\t\tStartTime\tFinishTime\tError");
+ if (TaskType.MAP.equals(taskType)) {
+ header.append("\tInputSplits");
+ }
+ header.append("\n====================================================");
+ StringBuilder taskList = new StringBuilder();
+ for (JobHistoryParser.TaskInfo task : tasks.values()) {
+ if (taskType.equals(task.getTaskType()) &&
+ (status.equals(task.getTaskStatus())
+ || status.equalsIgnoreCase("ALL"))) {
+ taskList.setLength(0);
+ taskList.append(task.getTaskId());
+ taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, task.getStartTime(), 0));
+ taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, task.getFinishTime(),
+ task.getStartTime()));
+ taskList.append("\t").append(task.getError());
+ if (TaskType.MAP.equals(taskType)) {
+ taskList.append("\t").append(task.getSplitLocations());
+ }
+ if (taskList != null) {
+ ps.println(header);
+ ps.println(taskList);
+ }
+ }
+ }
+ }
+
+ private void printFailedAttempts(PrintStream ps,
+ HistoryViewer.FilteredJob filteredJob) {
+ Map> badNodes = filteredJob.getFilteredMap();
+ StringBuilder attempts = new StringBuilder();
+ if (badNodes.size() > 0) {
+ attempts.append("\n").append(filteredJob.getFilter());
+ attempts.append(" task attempts by nodes");
+ attempts.append("\nHostname\tFailedTasks");
+ attempts.append("\n===============================");
+ ps.println(attempts);
+ for (Map.Entry> entry : badNodes.entrySet()) {
+ String node = entry.getKey();
+ Set failedTasks = entry.getValue();
+ attempts.setLength(0);
+ attempts.append(node).append("\t");
+ for (TaskID t : failedTasks) {
+ attempts.append(t).append(", ");
+ }
+ ps.println(attempts);
+ }
+ }
+ }
+
+ private static Comparator cMap =
+ new Comparator() {
+ public int compare(JobHistoryParser.TaskAttemptInfo t1,
+ JobHistoryParser.TaskAttemptInfo t2) {
+ long l1 = t1.getFinishTime() - t1.getStartTime();
+ long l2 = t2.getFinishTime() - t2.getStartTime();
+ return Long.compare(l2, l1);
+ }
+ };
+
+ private static Comparator cShuffle =
+ new Comparator() {
+ public int compare(JobHistoryParser.TaskAttemptInfo t1,
+ JobHistoryParser.TaskAttemptInfo t2) {
+ long l1 = t1.getShuffleFinishTime() - t1.getStartTime();
+ long l2 = t2.getShuffleFinishTime() - t2.getStartTime();
+ return Long.compare(l2, l1);
+ }
+ };
+
+ private static Comparator cFinishShuffle =
+ new Comparator() {
+ public int compare(JobHistoryParser.TaskAttemptInfo t1,
+ JobHistoryParser.TaskAttemptInfo t2) {
+ long l1 = t1.getShuffleFinishTime();
+ long l2 = t2.getShuffleFinishTime();
+ return Long.compare(l2, l1);
+ }
+ };
+
+ private static Comparator cFinishMapRed =
+ new Comparator() {
+ public int compare(JobHistoryParser.TaskAttemptInfo t1,
+ JobHistoryParser.TaskAttemptInfo t2) {
+ long l1 = t1.getFinishTime();
+ long l2 = t2.getFinishTime();
+ return Long.compare(l2, l1);
+ }
+ };
+
+ private static Comparator cReduce =
+ new Comparator() {
+ public int compare(JobHistoryParser.TaskAttemptInfo t1,
+ JobHistoryParser.TaskAttemptInfo t2) {
+ long l1 = t1.getFinishTime() - t1.getShuffleFinishTime();
+ long l2 = t2.getFinishTime() - t2.getShuffleFinishTime();
+ return Long.compare(l2, l1);
+ }
+ };
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JSONHistoryViewerPrinter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JSONHistoryViewerPrinter.java
new file mode 100644
index 00000000000..456dcf7111e
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JSONHistoryViewerPrinter.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Used by the {@link HistoryViewer} to print job history in a machine-readable
+ * JSON format.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class JSONHistoryViewerPrinter implements HistoryViewerPrinter {
+
+ private JobHistoryParser.JobInfo job;
+ private boolean printAll;
+ private String scheme;
+ private JSONObject json;
+
+ JSONHistoryViewerPrinter(JobHistoryParser.JobInfo job, boolean printAll,
+ String scheme) {
+ this.job = job;
+ this.printAll = printAll;
+ this.scheme = scheme;
+ }
+
+ /**
+ * Print out the Job History to the given {@link PrintStream} in a
+ * machine-readable JSON format.
+ * @param ps the {@link PrintStream} to print to
+ * @throws IOException when a problem occurs while printing
+ */
+ @Override
+ public void print(PrintStream ps) throws IOException {
+ json = new JSONObject();
+
+ Writer writer = null;
+ try {
+ printJobDetails();
+ printTaskSummary();
+ printTasks();
+
+ writer = new OutputStreamWriter(ps, "UTF-8");
+ json.write(writer);
+ writer.flush();
+ } catch (JSONException je) {
+ throw new IOException("Failure parsing JSON document: " + je.getMessage(),
+ je);
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+
+ private void printJobDetails() throws JSONException {
+ json.put("hadoopJob", job.getJobId().toString());
+ json.put("user", job.getUsername());
+ json.put("jobName", job.getJobname());
+ json.put("jobConf", job.getJobConfPath());
+ json.put("submittedAt", job.getSubmitTime());
+ json.put("launchedAt", job.getLaunchTime());
+ json.put("finishedAt", job.getFinishTime());
+ json.put("status", ((job.getJobStatus() == null) ?
+ "Incomplete" :job.getJobStatus()));
+ printJobCounters(job.getTotalCounters(), job.getMapCounters(),
+ job.getReduceCounters());
+ }
+
+ private void printJobCounters(Counters totalCounters, Counters mapCounters,
+ Counters reduceCounters) throws JSONException {
+ // Killed jobs might not have counters
+ if (totalCounters != null) {
+ JSONObject jGroups = new JSONObject();
+ for (String groupName : totalCounters.getGroupNames()) {
+ CounterGroup totalGroup = totalCounters.getGroup(groupName);
+ CounterGroup mapGroup = mapCounters.getGroup(groupName);
+ CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
+
+ Iterator ctrItr =
+ totalGroup.iterator();
+ JSONArray jGroup = new JSONArray();
+ while (ctrItr.hasNext()) {
+ JSONObject jCounter = new JSONObject();
+ org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
+ String name = counter.getName();
+ long mapValue = mapGroup.findCounter(name).getValue();
+ long reduceValue = reduceGroup.findCounter(name).getValue();
+ long totalValue = counter.getValue();
+
+ jCounter.put("counterName", name);
+ jCounter.put("mapValue", mapValue);
+ jCounter.put("reduceValue", reduceValue);
+ jCounter.put("totalValue", totalValue);
+ jGroup.put(jCounter);
+ }
+ jGroups.put(fixGroupNameForShuffleErrors(totalGroup.getName()), jGroup);
+ }
+ json.put("counters", jGroups);
+ }
+ }
+
+ private void printTaskSummary() throws JSONException {
+ HistoryViewer.SummarizedJob ts = new HistoryViewer.SummarizedJob(job);
+ JSONObject jSums = new JSONObject();
+ JSONObject jSumSetup = new JSONObject();
+ jSumSetup.put("total", ts.totalSetups);
+ jSumSetup.put("successful", ts.numFinishedSetups);
+ jSumSetup.put("failed", ts.numFailedSetups);
+ jSumSetup.put("killed", ts.numKilledSetups);
+ jSumSetup.put("startTime", ts.setupStarted);
+ jSumSetup.put("finishTime", ts.setupFinished);
+ jSums.put("setup", jSumSetup);
+ JSONObject jSumMap = new JSONObject();
+ jSumMap.put("total", ts.totalMaps);
+ jSumMap.put("successful", job.getFinishedMaps());
+ jSumMap.put("failed", ts.numFailedMaps);
+ jSumMap.put("killed", ts.numKilledMaps);
+ jSumMap.put("startTime", ts.mapStarted);
+ jSumMap.put("finishTime", ts.mapFinished);
+ jSums.put("map", jSumMap);
+ JSONObject jSumReduce = new JSONObject();
+ jSumReduce.put("total", ts.totalReduces);
+ jSumReduce.put("successful", job.getFinishedReduces());
+ jSumReduce.put("failed", ts.numFailedReduces);
+ jSumReduce.put("killed", ts.numKilledReduces);
+ jSumReduce.put("startTime", ts.reduceStarted);
+ jSumReduce.put("finishTime", ts.reduceFinished);
+ jSums.put("reduce", jSumReduce);
+ JSONObject jSumCleanup = new JSONObject();
+ jSumCleanup.put("total", ts.totalCleanups);
+ jSumCleanup.put("successful", ts.numFinishedCleanups);
+ jSumCleanup.put("failed", ts.numFailedCleanups);
+ jSumCleanup.put("killed", ts.numKilledCleanups);
+ jSumCleanup.put("startTime", ts.cleanupStarted);
+ jSumCleanup.put("finishTime", ts.cleanupFinished);
+ jSums.put("cleanup", jSumCleanup);
+ json.put("taskSummary", jSums);
+ }
+
+ private void printTasks() throws JSONException {
+ Map tasks = job.getAllTasks();
+ JSONArray jTasks = new JSONArray();
+ for (JobHistoryParser.TaskInfo task : tasks.values()) {
+ if (!task.getTaskType().equals(TaskType.TASK_CLEANUP) &&
+ ((printAll && task.getTaskStatus().equals(
+ TaskStatus.State.SUCCEEDED.toString()))
+ || task.getTaskStatus().equals(TaskStatus.State.KILLED.toString())
+ || task.getTaskStatus().equals(TaskStatus.State.FAILED.toString()))) {
+ JSONObject jTask = new JSONObject();
+ jTask.put("taskId", task.getTaskId().toString());
+ jTask.put("type", task.getTaskType().toString());
+ jTask.put("status", task.getTaskStatus());
+ jTask.put("startTime", task.getStartTime());
+ jTask.put("finishTime", task.getFinishTime());
+ if (!task.getError().isEmpty()) {
+ jTask.put("error", task.getError());
+ }
+ if (task.getTaskType().equals(TaskType.MAP)) {
+ jTask.put("inputSplits", task.getSplitLocations());
+ }
+ if (printAll) {
+ printTaskCounters(jTask, task.getCounters());
+ JSONObject jAtt = new JSONObject();
+ for (JobHistoryParser.TaskAttemptInfo attempt :
+ task.getAllTaskAttempts().values()) {
+ jAtt.put("attemptId", attempt.getAttemptId());
+ jAtt.put("startTime", attempt.getStartTime());
+ if (task.getTaskType().equals(TaskType.REDUCE)) {
+ jAtt.put("shuffleFinished", attempt.getShuffleFinishTime());
+ jAtt.put("sortFinished", attempt.getSortFinishTime());
+ }
+ jAtt.put("finishTime", attempt.getFinishTime());
+ jAtt.put("hostName", attempt.getHostname());
+ if (!attempt.getError().isEmpty()) {
+ jAtt.put("error", task.getError());
+ }
+ String taskLogsUrl = HistoryViewer.getTaskLogsUrl(scheme, attempt);
+ if (taskLogsUrl != null) {
+ jAtt.put("taskLogs", taskLogsUrl);
+ }
+ }
+ jTask.put("attempts", jAtt);
+ }
+ jTasks.put(jTask);
+ }
+ json.put("tasks", jTasks);
+ }
+ }
+
+ private void printTaskCounters(JSONObject jTask, Counters taskCounters)
+ throws JSONException {
+ // Killed tasks might not have counters
+ if (taskCounters != null) {
+ JSONObject jGroups = new JSONObject();
+ for (String groupName : taskCounters.getGroupNames()) {
+ CounterGroup group = taskCounters.getGroup(groupName);
+
+ Iterator ctrItr = group.iterator();
+ JSONArray jGroup = new JSONArray();
+ while (ctrItr.hasNext()) {
+ JSONObject jCounter = new JSONObject();
+ org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
+
+ jCounter.put("counterName", counter.getName());
+ jCounter.put("value", counter.getValue());
+ jGroup.put(jCounter);
+ }
+ jGroups.put(fixGroupNameForShuffleErrors(group.getName()), jGroup);
+ }
+ jTask.put("counters", jGroups);
+ }
+ }
+
+ private String fixGroupNameForShuffleErrors(String name) {
+ String retName = name;
+
+ if (name.equals("Shuffle Errors")) {
+ retName = "org.apache.hadoop.mapreduce.task.reduce.Fetcher.ShuffleErrors";
+ }
+
+ return retName;
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
index db87d9dda17..ea9c1c0a32f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.mapreduce.tools;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
@@ -92,6 +96,8 @@ public class CLI extends Configured implements Tool {
String jobid = null;
String taskid = null;
String historyFile = null;
+ String historyOutFile = null;
+ String historyOutFormat = HistoryViewer.HUMAN_FORMAT;
String counterGroupName = null;
String counterName = null;
JobPriority jp = null;
@@ -173,16 +179,51 @@ public class CLI extends Configured implements Tool {
nEvents = Integer.parseInt(argv[3]);
listEvents = true;
} else if ("-history".equals(cmd)) {
- if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
- displayUsage(cmd);
- return exitCode;
- }
viewHistory = true;
- if (argv.length == 3 && "all".equals(argv[1])) {
+ if (argv.length < 2 || argv.length > 7) {
+ displayUsage(cmd);
+ return exitCode;
+ }
+
+ // Some arguments are optional while others are not, and some require
+ // second arguments. Due to this, the indexing can vary depending on
+ // what's specified and what's left out, as summarized in the below table:
+ // [all] [-outfile ] [-format ]
+ // 1 2 3 4 5 6
+ // 1 2 3 4
+ // 1 2 3 4
+ // 1 2
+ // 1 2 3 4 5
+ // 1 2 3
+ // 1 2 3
+ // 1
+
+ // "all" is optional, but comes first if specified
+ int index = 1;
+ if ("all".equals(argv[index])) {
+ index++;
viewAllHistory = true;
- historyFile = argv[2];
- } else {
- historyFile = argv[1];
+ if (argv.length == 2) {
+ displayUsage(cmd);
+ return exitCode;
+ }
+ }
+ // Get the job history file argument
+ historyFile = argv[index++];
+ // "-outfile" is optional, but if specified requires a second argument
+ if (argv.length > index + 1 && "-outfile".equals(argv[index])) {
+ index++;
+ historyOutFile = argv[index++];
+ }
+ // "-format" is optional, but if specified required a second argument
+ if (argv.length > index + 1 && "-format".equals(argv[index])) {
+ index++;
+ historyOutFormat = argv[index++];
+ }
+ // Check for any extra arguments that don't belong here
+ if (argv.length > index) {
+ displayUsage(cmd);
+ return exitCode;
}
} else if ("-list".equals(cmd)) {
if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
@@ -338,7 +379,8 @@ public class CLI extends Configured implements Tool {
exitCode = 0;
}
} else if (viewHistory) {
- viewHistory(historyFile, viewAllHistory);
+ viewHistory(historyFile, viewAllHistory, historyOutFile,
+ historyOutFormat);
exitCode = 0;
} else if (listEvents) {
listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
@@ -451,7 +493,8 @@ public class CLI extends Configured implements Tool {
System.err.println(prefix + "[" + cmd +
" <#-of-events>]. Event #s start from 1.");
} else if ("-history".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " ]");
+ System.err.println(prefix + "[" + cmd +
+ " [all] [-outfile ] [-format ]]");
} else if ("-list".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " [all]]");
} else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
@@ -484,7 +527,8 @@ public class CLI extends Configured implements Tool {
"Valid values for priorities are: " + jobPriorityValues +
". In addition to this, integers also can be used." + "%n");
System.err.printf("\t[-events <#-of-events>]%n");
- System.err.printf("\t[-history ]%n");
+ System.err.printf("\t[-history [all] [-outfile ]" +
+ " [-format ]]%n");
System.err.printf("\t[-list [all]]%n");
System.err.printf("\t[-list-active-trackers]%n");
System.err.printf("\t[-list-blacklisted-trackers]%n");
@@ -499,11 +543,16 @@ public class CLI extends Configured implements Tool {
}
}
- private void viewHistory(String historyFile, boolean all)
- throws IOException {
+ private void viewHistory(String historyFile, boolean all,
+ String historyOutFile, String format) throws IOException {
HistoryViewer historyViewer = new HistoryViewer(historyFile,
- getConf(), all);
- historyViewer.print();
+ getConf(), all, format);
+ PrintStream ps = System.out;
+ if (historyOutFile != null) {
+ ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(
+ new File(historyOutFile))), true, "UTF-8");
+ }
+ historyViewer.print(ps);
}
protected long getCounter(Counters counters, String counterGroupName,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
index 7d6f5ffefc9..dda2ddef913 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md
@@ -81,7 +81,7 @@ Copy file or directories recursively. More information can be found at
Command to interact with Map Reduce Jobs.
-Usage: `mapred job | [GENERIC_OPTIONS] | [-submit ] | [-status ] | [-counter ] | [-kill ] | [-events <#-of-events>] | [-history [all] ] | [-list [all]] | [-kill-task ] | [-fail-task ] | [-set-priority ] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids ] [-logs ]`
+Usage: `mapred job | [GENERIC_OPTIONS] | [-submit ] | [-status ] | [-counter ] | [-kill ] | [-events <#-of-events>] | [-history [all] [-outfile ] [-format ]] | [-list [all]] | [-kill-task ] | [-fail-task ] | [-set-priority ] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids ] [-logs ]`
| COMMAND\_OPTION | Description |
|:---- |:---- |
@@ -90,7 +90,7 @@ Usage: `mapred job | [GENERIC_OPTIONS] | [-submit ] | [-status ();
+ addTaskInfo(job, TaskType.JOB_SETUP, 1, TaskStatus.State.SUCCEEDED);
+ addTaskInfo(job, TaskType.MAP, 2, TaskStatus.State.FAILED);
+ addTaskInfo(job, TaskType.MAP, 3, TaskStatus.State.SUCCEEDED);
+ addTaskInfo(job, TaskType.MAP, 4, TaskStatus.State.SUCCEEDED);
+ addTaskInfo(job, TaskType.MAP, 5, TaskStatus.State.SUCCEEDED);
+ addTaskInfo(job, TaskType.MAP, 6, TaskStatus.State.SUCCEEDED);
+ addTaskInfo(job, TaskType.MAP, 7, TaskStatus.State.SUCCEEDED);
+ addTaskInfo(job, TaskType.REDUCE, 8, TaskStatus.State.SUCCEEDED);
+ addTaskInfo(job, TaskType.JOB_CLEANUP, 9, TaskStatus.State.SUCCEEDED);
+ return job;
+ }
+
+ private static Counters createCounters() {
+ Counters counters = new Counters();
+ counters.findCounter("group1", "counter1").setValue(5);
+ counters.findCounter("group1", "counter2").setValue(10);
+ counters.findCounter("group2", "counter1").setValue(15);
+ return counters;
+ }
+
+ private static void addTaskInfo(JobHistoryParser.JobInfo job,
+ TaskType type, int id, TaskStatus.State status) {
+ JobHistoryParser.TaskInfo task = new JobHistoryParser.TaskInfo();
+ task.taskId = new TaskID(job.getJobId(), type, id);
+ task.startTime = job.getLaunchTime() + id * 1000;
+ task.finishTime = task.startTime + id * 1000;
+ task.taskType = type;
+ task.counters = createCounters();
+ task.status = status.name();
+ task.attemptsMap = new HashMap<>();
+ addTaskAttemptInfo(task, 1);
+ job.tasksMap.put(task.getTaskId(), task);
+ }
+
+ private static void addTaskAttemptInfo(
+ JobHistoryParser.TaskInfo task, int id) {
+ JobHistoryParser.TaskAttemptInfo attempt =
+ new JobHistoryParser.TaskAttemptInfo();
+ attempt.attemptId = new TaskAttemptID(
+ TaskID.downgrade(task.getTaskId()), id);
+ attempt.startTime = task.getStartTime();
+ attempt.finishTime = task.getFinishTime();
+ attempt.shuffleFinishTime = task.getFinishTime();
+ attempt.sortFinishTime = task.getFinishTime();
+ attempt.mapFinishTime = task.getFinishTime();
+ attempt.status = task.getTaskStatus();
+ attempt.taskType = task.getTaskType();
+ attempt.trackerName = "localhost";
+ attempt.httpPort = 1234;
+ attempt.hostname = "localhost";
+ task.attemptsMap.put(attempt.getAttemptId(), attempt);
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
index 23822390355..f13c1635df0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
@@ -29,6 +30,8 @@ import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -359,27 +362,152 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
String historyFileUri = new Path(f.getAbsolutePath())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toUri()
.toString();
-
- // bad command
- int exitCode = runTool(conf, jc, new String[] { "-history", "pul",
- historyFileUri }, out);
- assertEquals("Exit code", -1, exitCode);
- exitCode = runTool(conf, jc, new String[] { "-history", "all",
- historyFileUri }, out);
+ // Try a bunch of different valid combinations of the command to test
+ // argument parsing
+ int exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ "all",
+ historyFileUri,
+ }, out);
assertEquals("Exit code", 0, exitCode);
- String line;
+ checkHistoryHumanOutput(out);
+ File outFile = File.createTempFile("myout", ".txt");
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ "all",
+ historyFileUri,
+ "-outfile",
+ outFile.getAbsolutePath()
+ }, out);
+ assertEquals("Exit code", 0, exitCode);
+ checkHistoryHumanFileOutput(out, outFile);
+ outFile = File.createTempFile("myout", ".txt");
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ "all",
+ historyFileUri,
+ "-outfile",
+ outFile.getAbsolutePath(),
+ "-format",
+ "human"
+ }, out);
+ assertEquals("Exit code", 0, exitCode);
+ checkHistoryHumanFileOutput(out, outFile);
+
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ historyFileUri,
+ "-format",
+ "human"
+ }, out);
+ assertEquals("Exit code", 0, exitCode);
+ checkHistoryHumanOutput(out);
+
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ "all",
+ historyFileUri,
+ "-format",
+ "json"
+ }, out);
+ assertEquals("Exit code", 0, exitCode);
+ checkHistoryJSONOutput(out);
+ outFile = File.createTempFile("myout", ".txt");
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ "all",
+ historyFileUri,
+ "-outfile",
+ outFile.getAbsolutePath(),
+ "-format",
+ "json"
+ }, out);
+ assertEquals("Exit code", 0, exitCode);
+ checkHistoryJSONFileOutput(out, outFile);
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ historyFileUri,
+ "-format",
+ "json"
+ }, out);
+ assertEquals("Exit code", 0, exitCode);
+ checkHistoryJSONOutput(out);
+
+ // Check some bad arguments
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ historyFileUri,
+ "foo"
+ }, out);
+ assertEquals("Exit code", -1, exitCode);
+ exitCode = runTool(conf, jc, new String[] {
+ "-history",
+ historyFileUri,
+ "-format"
+ }, out);
+ assertEquals("Exit code", -1, exitCode);
+ runTool(conf, jc, new String[] {
+ "-history",
+ historyFileUri,
+ "-outfile",
+ }, out);
+ assertEquals("Exit code", -1, exitCode);
+ try {
+ runTool(conf, jc, new String[]{
+ "-history",
+ historyFileUri,
+ "-format",
+ "foo"
+ }, out);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ private void checkHistoryHumanOutput(ByteArrayOutputStream out)
+ throws IOException, JSONException {
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
- int counter = 0;
- while ((line = br.readLine()) != null) {
- LOG.info("line = " + line);
- if (line.startsWith("task_")) {
- counter++;
- }
- }
- assertEquals(23, counter);
+ br.readLine();
+ String line = br.readLine();
+ br.close();
+ assertEquals("Hadoop job: job_1329348432655_0001", line);
+ out.reset();
}
+
+ private void checkHistoryJSONOutput(ByteArrayOutputStream out)
+ throws IOException, JSONException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+ String line = org.apache.commons.io.IOUtils.toString(br);
+ br.close();
+ JSONObject json = new JSONObject(line);
+ assertEquals("job_1329348432655_0001", json.getString("hadoopJob"));
+ out.reset();
+ }
+
+ private void checkHistoryHumanFileOutput(ByteArrayOutputStream out,
+ File outFile) throws IOException, JSONException {
+ BufferedReader br = new BufferedReader(new FileReader(outFile));
+ br.readLine();
+ String line = br.readLine();
+ br.close();
+ assertEquals("Hadoop job: job_1329348432655_0001", line);
+ assertEquals(0, out.size());
+ }
+
+ private void checkHistoryJSONFileOutput(ByteArrayOutputStream out,
+ File outFile) throws IOException, JSONException {
+ BufferedReader br = new BufferedReader(new FileReader(outFile));
+ String line = org.apache.commons.io.IOUtils.toString(br);
+ br.close();
+ JSONObject json = new JSONObject(line);
+ assertEquals("job_1329348432655_0001", json.getString("hadoopJob"));
+ assertEquals(0, out.size());
+ }
+
/**
* print job events list
*/
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 607857343e0..f23b46ed81a 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -992,6 +992,12 @@
+
+ org.skyscreamer
+ jsonassert
+ 1.3.0
+
+