From 8eee59ce6b3044cb73bb41fed6b7ece959e7c2f8 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Thu, 18 Feb 2016 17:58:26 -0800 Subject: [PATCH] MAPREDUCE-6627. Add machine-readable output to mapred job -history command (rkanter) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop-mapreduce-client-core/pom.xml | 5 + .../mapreduce/jobhistory/HistoryViewer.java | 480 +-------- .../jobhistory/HistoryViewerPrinter.java | 39 + .../HumanReadableHistoryViewerPrinter.java | 471 +++++++++ .../jobhistory/JSONHistoryViewerPrinter.java | 256 +++++ .../apache/hadoop/mapreduce/tools/CLI.java | 79 +- .../src/site/markdown/MapredCommands.md | 4 +- .../jobhistory/TestHistoryViewerPrinter.java | 945 ++++++++++++++++++ .../hadoop/mapreduce/TestMRJobClient.java | 160 ++- hadoop-project/pom.xml | 6 + 11 files changed, 1986 insertions(+), 462 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewerPrinter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JSONHistoryViewerPrinter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestHistoryViewerPrinter.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index da28bc607de..a6031c00ab8 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -313,6 +313,9 @@ Release 2.9.0 - UNRELEASED MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. (Sidharta Seethana via vvasudev) + MAPREDUCE-6627. Add machine-readable output to mapred job -history + command (rkanter) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index 61f3c533b81..f0952bc1ac8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -42,6 +42,11 @@ hadoop-hdfs test + + org.skyscreamer + jsonassert + test + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java index f343d7c2e2d..e5db2e5a1c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java @@ -19,13 +19,8 @@ package org.apache.hadoop.mapreduce.jobhistory; import java.io.FileNotFoundException; import java.io.IOException; -import java.text.DecimalFormat; -import java.text.Format; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Comparator; +import java.io.PrintStream; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -37,46 +32,52 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.TaskStatus; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.util.HostUtil; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; /** - * HistoryViewer is used to parse and view the JobHistory files - * + * HistoryViewer is used to parse and view the JobHistory files. They can be + * printed in human-readable format or machine-readable JSON format using the + * {@link HistoryViewerPrinter}. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class HistoryViewer { private static final Log LOG = LogFactory.getLog(HistoryViewer.class); - private static final SimpleDateFormat dateFormat = - new SimpleDateFormat("d-MMM-yyyy HH:mm:ss"); private FileSystem fs; private JobInfo job; - private String jobId; - private boolean printAll; + private HistoryViewerPrinter jhvp; + public static final String HUMAN_FORMAT = "human"; + public static final String JSON_FORMAT = "json"; -/** - * Constructs the HistoryViewer object - * @param historyFile The fully qualified Path of the History File - * @param conf The Configuration file - * @param printAll Toggle to print all status to only killed/failed status - * @throws IOException - */ - public HistoryViewer(String historyFile, - Configuration conf, + /** + * Constructs the HistoryViewer object. + * @param historyFile the fully qualified Path of the History File + * @param conf the Configuration file + * @param printAll toggle to print all status to only killed/failed status + * @throws IOException when there is a problem parsing the history file + */ + public HistoryViewer(String historyFile, Configuration conf, boolean printAll) throws IOException { - this.printAll = printAll; + this(historyFile, conf, printAll, HUMAN_FORMAT); + } + + /** + * Constructs the HistoryViewer object. + * @param historyFile the fully qualified Path of the History File + * @param conf the Configuration file + * @param printAll toggle to print all status to only killed/failed status + * @param format the output format to use + * @throws IOException when there is a problem parsing the history file + */ + public HistoryViewer(String historyFile, Configuration conf, boolean printAll, + String format) throws IOException { String errorMsg = "Unable to initialize History Viewer"; try { Path jobFile = new Path(historyFile); @@ -101,366 +102,41 @@ public class HistoryViewer { } JobHistoryParser parser = new JobHistoryParser(fs, jobFile); job = parser.parse(); - jobId = job.getJobId().toString(); - } catch(Exception e) { + String scheme = WebAppUtils.getHttpSchemePrefix(fs.getConf()); + if (HUMAN_FORMAT.equalsIgnoreCase(format)) { + jhvp = new HumanReadableHistoryViewerPrinter(job, printAll, scheme); + } else if (JSON_FORMAT.equalsIgnoreCase(format)) { + jhvp = new JSONHistoryViewerPrinter(job, printAll, scheme); + } else { + System.err.println("Invalid format specified: " + format); + throw new IllegalArgumentException(errorMsg); + } + } catch(IOException e) { throw new IOException(errorMsg, e); } } /** - * Print the job/task/attempt summary information - * @throws IOException + * Print the job/task/attempt summary information to stdout. + * @throws IOException when there is a problem printing the history */ - public void print() throws IOException{ - printJobDetails(); - printTaskSummary(); - printJobAnalysis(); - printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString()); - printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString()); - printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString()); - printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString()); - printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString()); - printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString()); - printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString()); - printTasks(TaskType.JOB_CLEANUP, - JobStatus.getJobRunState(JobStatus.KILLED)); - if (printAll) { - printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString()); - printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString()); - printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString()); - printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString()); - printAllTaskAttempts(TaskType.JOB_SETUP); - printAllTaskAttempts(TaskType.MAP); - printAllTaskAttempts(TaskType.REDUCE); - printAllTaskAttempts(TaskType.JOB_CLEANUP); - } - - FilteredJob filter = new FilteredJob(job, - TaskStatus.State.FAILED.toString()); - printFailedAttempts(filter); - - filter = new FilteredJob(job, - TaskStatus.State.KILLED.toString()); - printFailedAttempts(filter); - } - - private void printJobDetails() { - StringBuffer jobDetails = new StringBuffer(); - jobDetails.append("\nHadoop job: " ).append(job.getJobId()); - jobDetails.append("\n====================================="); - jobDetails.append("\nUser: ").append(job.getUsername()); - jobDetails.append("\nJobName: ").append(job.getJobname()); - jobDetails.append("\nJobConf: ").append(job.getJobConfPath()); - jobDetails.append("\nSubmitted At: ").append(StringUtils. - getFormattedTimeWithDiff(dateFormat, - job.getSubmitTime(), 0)); - jobDetails.append("\nLaunched At: ").append(StringUtils. - getFormattedTimeWithDiff(dateFormat, - job.getLaunchTime(), - job.getSubmitTime())); - jobDetails.append("\nFinished At: ").append(StringUtils. - getFormattedTimeWithDiff(dateFormat, - job.getFinishTime(), - job.getLaunchTime())); - jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ? - "Incomplete" :job.getJobStatus())); - printCounters(jobDetails, job.getTotalCounters(), job.getMapCounters(), - job.getReduceCounters()); - jobDetails.append("\n"); - jobDetails.append("\n====================================="); - System.out.println(jobDetails.toString()); + public void print() throws IOException { + print(System.out); } - private void printCounters(StringBuffer buff, Counters totalCounters, - Counters mapCounters, Counters reduceCounters) { - // Killed jobs might not have counters - if (totalCounters == null) { - return; - } - buff.append("\nCounters: \n\n"); - buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|", - "Group Name", - "Counter name", - "Map Value", - "Reduce Value", - "Total Value")); - buff.append("\n------------------------------------------"+ - "---------------------------------------------"); - for (String groupName : totalCounters.getGroupNames()) { - CounterGroup totalGroup = totalCounters.getGroup(groupName); - CounterGroup mapGroup = mapCounters.getGroup(groupName); - CounterGroup reduceGroup = reduceCounters.getGroup(groupName); - - Format decimal = new DecimalFormat(); - Iterator ctrItr = - totalGroup.iterator(); - while(ctrItr.hasNext()) { - org.apache.hadoop.mapreduce.Counter counter = ctrItr.next(); - String name = counter.getName(); - String mapValue = - decimal.format(mapGroup.findCounter(name).getValue()); - String reduceValue = - decimal.format(reduceGroup.findCounter(name).getValue()); - String totalValue = - decimal.format(counter.getValue()); - - buff.append( - String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s", - totalGroup.getDisplayName(), - counter.getDisplayName(), - mapValue, reduceValue, totalValue)); - } - } - } - - private void printAllTaskAttempts(TaskType taskType) { - Map tasks = job.getAllTasks(); - StringBuffer taskList = new StringBuffer(); - taskList.append("\n").append(taskType); - taskList.append(" task list for ").append(job.getJobId()); - taskList.append("\nTaskId\t\tStartTime"); - if (TaskType.REDUCE.equals(taskType)) { - taskList.append("\tShuffleFinished\tSortFinished"); - } - taskList.append("\tFinishTime\tHostName\tError\tTaskLogs"); - taskList.append("\n===================================================="); - System.out.println(taskList.toString()); - for (JobHistoryParser.TaskInfo task : tasks.values()) { - for (JobHistoryParser.TaskAttemptInfo attempt : - task.getAllTaskAttempts().values()) { - if (taskType.equals(task.getTaskType())){ - taskList.setLength(0); - taskList.append(attempt.getAttemptId()).append("\t"); - taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, - attempt.getStartTime(), 0)).append("\t"); - if (TaskType.REDUCE.equals(taskType)) { - taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, - attempt.getShuffleFinishTime(), - attempt.getStartTime())); - taskList.append("\t"); - taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, - attempt.getSortFinishTime(), - attempt.getShuffleFinishTime())); - } - taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, - attempt.getFinishTime(), - attempt.getStartTime())); - taskList.append("\t"); - taskList.append(attempt.getHostname()).append("\t"); - taskList.append(attempt.getError()); - String taskLogsUrl = getTaskLogsUrl( - WebAppUtils.getHttpSchemePrefix(fs.getConf()), attempt); - taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a"); - System.out.println(taskList.toString()); - } - } - } - } - - private void printTaskSummary() { - SummarizedJob ts = new SummarizedJob(job); - StringBuffer taskSummary = new StringBuffer(); - taskSummary.append("\nTask Summary"); - taskSummary.append("\n============================"); - taskSummary.append("\nKind\tTotal\t"); - taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime"); - taskSummary.append("\n"); - taskSummary.append("\nSetup\t").append(ts.totalSetups); - taskSummary.append("\t").append(ts.numFinishedSetups); - taskSummary.append("\t\t").append(ts.numFailedSetups); - taskSummary.append("\t").append(ts.numKilledSetups); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.setupStarted, 0)); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.setupFinished, ts.setupStarted)); - taskSummary.append("\nMap\t").append(ts.totalMaps); - taskSummary.append("\t").append(job.getFinishedMaps()); - taskSummary.append("\t\t").append(ts.numFailedMaps); - taskSummary.append("\t").append(ts.numKilledMaps); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.mapStarted, 0)); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.mapFinished, ts.mapStarted)); - taskSummary.append("\nReduce\t").append(ts.totalReduces); - taskSummary.append("\t").append(job.getFinishedReduces()); - taskSummary.append("\t\t").append(ts.numFailedReduces); - taskSummary.append("\t").append(ts.numKilledReduces); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.reduceStarted, 0)); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.reduceFinished, ts.reduceStarted)); - taskSummary.append("\nCleanup\t").append(ts.totalCleanups); - taskSummary.append("\t").append(ts.numFinishedCleanups); - taskSummary.append("\t\t").append(ts.numFailedCleanups); - taskSummary.append("\t").append(ts.numKilledCleanups); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.cleanupStarted, 0)); - taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, ts.cleanupFinished, - ts.cleanupStarted)); - taskSummary.append("\n============================\n"); - System.out.println(taskSummary.toString()); - } - - private void printJobAnalysis() { - if (!job.getJobStatus().equals - (JobStatus.getJobRunState(JobStatus.SUCCEEDED))) { - System.out.println("No Analysis available as job did not finish"); - return; - } - - AnalyzedJob avg = new AnalyzedJob(job); - - System.out.println("\nAnalysis"); - System.out.println("========="); - printAnalysis(avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), 10); - printLast(avg.getMapTasks(), "map", cFinishMapRed); - - if (avg.getReduceTasks().length > 0) { - printAnalysis(avg.getReduceTasks(), cShuffle, "shuffle", - avg.getAvgShuffleTime(), 10); - printLast(avg.getReduceTasks(), "shuffle", cFinishShuffle); - - printAnalysis(avg.getReduceTasks(), cReduce, "reduce", - avg.getAvgReduceTime(), 10); - printLast(avg.getReduceTasks(), "reduce", cFinishMapRed); - } - System.out.println("========="); - } - - private void printAnalysis(JobHistoryParser.TaskAttemptInfo [] tasks, - Comparator cmp, - String taskType, - long avg, - int showTasks) { - Arrays.sort(tasks, cmp); - JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1]; - StringBuffer details = new StringBuffer(); - details.append("\nTime taken by best performing "); - details.append(taskType).append(" task "); - details.append(min.getAttemptId().getTaskID().toString()).append(": "); - if ("map".equals(taskType)) { - details.append(StringUtils.formatTimeDiff( - min.getFinishTime(), - min.getStartTime())); - } else if ("shuffle".equals(taskType)) { - details.append(StringUtils.formatTimeDiff( - min.getShuffleFinishTime(), - min.getStartTime())); - } else { - details.append(StringUtils.formatTimeDiff( - min.getFinishTime(), - min.getShuffleFinishTime())); - } - details.append("\nAverage time taken by "); - details.append(taskType).append(" tasks: "); - details.append(StringUtils.formatTimeDiff(avg, 0)); - details.append("\nWorse performing "); - details.append(taskType).append(" tasks: "); - details.append("\nTaskId\t\tTimetaken"); - System.out.println(details.toString()); - for (int i = 0; i < showTasks && i < tasks.length; i++) { - details.setLength(0); - details.append(tasks[i].getAttemptId().getTaskID()).append(" "); - if ("map".equals(taskType)) { - details.append(StringUtils.formatTimeDiff( - tasks[i].getFinishTime(), - tasks[i].getStartTime())); - } else if ("shuffle".equals(taskType)) { - details.append(StringUtils.formatTimeDiff( - tasks[i].getShuffleFinishTime(), - tasks[i].getStartTime())); - } else { - details.append(StringUtils.formatTimeDiff( - tasks[i].getFinishTime(), - tasks[i].getShuffleFinishTime())); - } - System.out.println(details.toString()); - } - } - - private void printLast(JobHistoryParser.TaskAttemptInfo [] tasks, - String taskType, - Comparator cmp - ) { - Arrays.sort(tasks, cFinishMapRed); - JobHistoryParser.TaskAttemptInfo last = tasks[0]; - StringBuffer lastBuf = new StringBuffer(); - lastBuf.append("The last ").append(taskType); - lastBuf.append(" task ").append(last.getAttemptId().getTaskID()); - Long finishTime; - if ("shuffle".equals(taskType)) { - finishTime = last.getShuffleFinishTime(); - } else { - finishTime = last.getFinishTime(); - } - lastBuf.append(" finished at (relative to the Job launch time): "); - lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat, - finishTime, job.getLaunchTime())); - System.out.println(lastBuf.toString()); - } - - private void printTasks(TaskType taskType, String status) { - Map tasks = job.getAllTasks(); - StringBuffer header = new StringBuffer(); - header.append("\n").append(status).append(" "); - header.append(taskType).append(" task list for ").append(jobId); - header.append("\nTaskId\t\tStartTime\tFinishTime\tError"); - if (TaskType.MAP.equals(taskType)) { - header.append("\tInputSplits"); - } - header.append("\n===================================================="); - StringBuffer taskList = new StringBuffer(); - for (JobHistoryParser.TaskInfo task : tasks.values()) { - if (taskType.equals(task.getTaskType()) && - (status.equals(task.getTaskStatus()) - || status.equalsIgnoreCase("ALL"))) { - taskList.setLength(0); - taskList.append(task.getTaskId()); - taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, task.getStartTime(), 0)); - taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff( - dateFormat, task.getFinishTime(), - task.getStartTime())); - taskList.append("\t").append(task.getError()); - if (TaskType.MAP.equals(taskType)) { - taskList.append("\t").append(task.getSplitLocations()); - } - if (taskList != null) { - System.out.println(header.toString()); - System.out.println(taskList.toString()); - } - } - } - } - - private void printFailedAttempts(FilteredJob filteredJob) { - Map> badNodes = filteredJob.getFilteredMap(); - StringBuffer attempts = new StringBuffer(); - if (badNodes.size() > 0) { - attempts.append("\n").append(filteredJob.getFilter()); - attempts.append(" task attempts by nodes"); - attempts.append("\nHostname\tFailedTasks"); - attempts.append("\n==============================="); - System.out.println(attempts.toString()); - for (Map.Entry> entry : badNodes.entrySet()) { - String node = entry.getKey(); - Set failedTasks = entry.getValue(); - attempts.setLength(0); - attempts.append(node).append("\t"); - for (TaskID t : failedTasks) { - attempts.append(t).append(", "); - } - System.out.println(attempts.toString()); - } - } + /** + * Print the job/task/attempt summary information to the PrintStream. + * @param ps The PrintStream to print to + * @throws IOException when there is a problem printing the history + */ + public void print(PrintStream ps) throws IOException { + jhvp.print(ps); } /** - * Return the TaskLogsUrl of a particular TaskAttempt + * Return the TaskLogsUrl of a particular TaskAttempt. * - * @param attempt + * @param attempt info about the task attempt * @return the taskLogsUrl. null if http-port or tracker-name or * task-attempt-id are unavailable. */ @@ -480,58 +156,6 @@ public class HistoryViewer { attempt.getAttemptId().toString()); } - private Comparator cMap = - new Comparator() { - public int compare(JobHistoryParser.TaskAttemptInfo t1, - JobHistoryParser.TaskAttemptInfo t2) { - long l1 = t1.getFinishTime() - t1.getStartTime(); - long l2 = t2.getFinishTime() - t2.getStartTime(); - return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); - } - }; - - private Comparator cShuffle = - new Comparator() { - public int compare(JobHistoryParser.TaskAttemptInfo t1, - JobHistoryParser.TaskAttemptInfo t2) { - long l1 = t1.getShuffleFinishTime() - t1.getStartTime(); - long l2 = t2.getShuffleFinishTime() - t2.getStartTime(); - return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); - } - }; - - private Comparator cFinishShuffle = - new Comparator() { - public int compare(JobHistoryParser.TaskAttemptInfo t1, - JobHistoryParser.TaskAttemptInfo t2) { - long l1 = t1.getShuffleFinishTime(); - long l2 = t2.getShuffleFinishTime(); - return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); - } - }; - - private Comparator cFinishMapRed = - new Comparator() { - public int compare(JobHistoryParser.TaskAttemptInfo t1, - JobHistoryParser.TaskAttemptInfo t2) { - long l1 = t1.getFinishTime(); - long l2 = t2.getFinishTime(); - return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); - } - }; - - private Comparator cReduce = - new Comparator() { - public int compare(JobHistoryParser.TaskAttemptInfo t1, - JobHistoryParser.TaskAttemptInfo t2) { - long l1 = t1.getFinishTime() - - t1.getShuffleFinishTime(); - long l2 = t2.getFinishTime() - - t2.getShuffleFinishTime(); - return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); - } - }; - /** * Utility class used the summarize the job. * Used by HistoryViewer and the JobHistory UI. @@ -694,7 +318,6 @@ public class HistoryViewer { * Utility class used while analyzing the job. * Used by HistoryViewer and the JobHistory UI. */ - public static class AnalyzedJob { private long avgMapTime; private long avgReduceTime; @@ -765,7 +388,6 @@ public class HistoryViewer { /** * Utility to filter out events based on the task status - * */ public static class FilteredJob { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewerPrinter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewerPrinter.java new file mode 100644 index 00000000000..406b4391c83 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewerPrinter.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.jobhistory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.io.PrintStream; + +/** + * Used by the {@link HistoryViewer} to print job history in different formats. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +interface HistoryViewerPrinter { + + /** + * Print out the Job History to the given {@link PrintStream}. + * @param ps the {@link PrintStream} to print to + * @throws IOException when a problem occurs while printing + */ + void print(PrintStream ps) throws IOException; +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java new file mode 100644 index 00000000000..30903afd7d2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java @@ -0,0 +1,471 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.jobhistory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.io.PrintStream; +import java.text.DecimalFormat; +import java.text.Format; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; + +/** + * Used by the {@link HistoryViewer} to print job history in a human-readable + * format. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class HumanReadableHistoryViewerPrinter implements HistoryViewerPrinter { + + private JobHistoryParser.JobInfo job; + private final SimpleDateFormat dateFormat; + private boolean printAll; + private String scheme; + + HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job, + boolean printAll, String scheme) { + this.job = job; + this.printAll = printAll; + this.scheme = scheme; + this.dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss"); + } + + HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job, + boolean printAll, String scheme, + TimeZone tz) { + this(job, printAll, scheme); + this.dateFormat.setTimeZone(tz); + } + + /** + * Print out the Job History to the given {@link PrintStream} in a + * human-readable format. + * @param ps the {@link PrintStream} to print to + * @throws IOException when a problem occurs while printing + */ + @Override + public void print(PrintStream ps) throws IOException { + printJobDetails(ps); + printTaskSummary(ps); + printJobAnalysis(ps); + printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString()); + printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString()); + printTasks(ps, TaskType.MAP, TaskStatus.State.FAILED.toString()); + printTasks(ps, TaskType.MAP, TaskStatus.State.KILLED.toString()); + printTasks(ps, TaskType.REDUCE, TaskStatus.State.FAILED.toString()); + printTasks(ps, TaskType.REDUCE, TaskStatus.State.KILLED.toString()); + printTasks(ps, TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString()); + printTasks(ps, TaskType.JOB_CLEANUP, + JobStatus.getJobRunState(JobStatus.KILLED)); + if (printAll) { + printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString()); + printTasks(ps, TaskType.MAP, TaskStatus.State.SUCCEEDED.toString()); + printTasks(ps, TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString()); + printTasks(ps, TaskType.JOB_CLEANUP, + TaskStatus.State.SUCCEEDED.toString()); + printAllTaskAttempts(ps, TaskType.JOB_SETUP); + printAllTaskAttempts(ps, TaskType.MAP); + printAllTaskAttempts(ps, TaskType.REDUCE); + printAllTaskAttempts(ps, TaskType.JOB_CLEANUP); + } + + HistoryViewer.FilteredJob filter = new HistoryViewer.FilteredJob(job, + TaskStatus.State.FAILED.toString()); + printFailedAttempts(ps, filter); + + filter = new HistoryViewer.FilteredJob(job, + TaskStatus.State.KILLED.toString()); + printFailedAttempts(ps, filter); + } + + private void printJobDetails(PrintStream ps) { + StringBuilder jobDetails = new StringBuilder(); + jobDetails.append("\nHadoop job: ").append(job.getJobId()); + jobDetails.append("\n====================================="); + jobDetails.append("\nUser: ").append(job.getUsername()); + jobDetails.append("\nJobName: ").append(job.getJobname()); + jobDetails.append("\nJobConf: ").append(job.getJobConfPath()); + jobDetails.append("\nSubmitted At: ").append(StringUtils. + getFormattedTimeWithDiff(dateFormat, + job.getSubmitTime(), 0)); + jobDetails.append("\nLaunched At: ").append(StringUtils. + getFormattedTimeWithDiff(dateFormat, + job.getLaunchTime(), + job.getSubmitTime())); + jobDetails.append("\nFinished At: ").append(StringUtils. + getFormattedTimeWithDiff(dateFormat, + job.getFinishTime(), + job.getLaunchTime())); + jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ? + "Incomplete" :job.getJobStatus())); + printJobCounters(jobDetails, job.getTotalCounters(), job.getMapCounters(), + job.getReduceCounters()); + jobDetails.append("\n"); + jobDetails.append("\n====================================="); + ps.println(jobDetails); + } + + private void printJobCounters(StringBuilder buff, Counters totalCounters, + Counters mapCounters, Counters reduceCounters) { + // Killed jobs might not have counters + if (totalCounters != null) { + buff.append("\nCounters: \n\n"); + buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|", + "Group Name", + "Counter name", + "Map Value", + "Reduce Value", + "Total Value")); + buff.append("\n------------------------------------------" + + "---------------------------------------------"); + for (String groupName : totalCounters.getGroupNames()) { + CounterGroup totalGroup = totalCounters.getGroup(groupName); + CounterGroup mapGroup = mapCounters.getGroup(groupName); + CounterGroup reduceGroup = reduceCounters.getGroup(groupName); + + Format decimal = new DecimalFormat(); + Iterator ctrItr = + totalGroup.iterator(); + while (ctrItr.hasNext()) { + org.apache.hadoop.mapreduce.Counter counter = ctrItr.next(); + String name = counter.getName(); + String mapValue = + decimal.format(mapGroup.findCounter(name).getValue()); + String reduceValue = + decimal.format(reduceGroup.findCounter(name).getValue()); + String totalValue = + decimal.format(counter.getValue()); + + buff.append( + String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s", + totalGroup.getDisplayName(), + counter.getDisplayName(), + mapValue, reduceValue, totalValue)); + } + } + } + } + + private void printAllTaskAttempts(PrintStream ps, TaskType taskType) { + Map tasks = job.getAllTasks(); + StringBuilder taskList = new StringBuilder(); + taskList.append("\n").append(taskType); + taskList.append(" task list for ").append(job.getJobId()); + taskList.append("\nTaskId\t\tStartTime"); + if (TaskType.REDUCE.equals(taskType)) { + taskList.append("\tShuffleFinished\tSortFinished"); + } + taskList.append("\tFinishTime\tHostName\tError\tTaskLogs"); + taskList.append("\n===================================================="); + ps.println(taskList.toString()); + for (JobHistoryParser.TaskInfo task : tasks.values()) { + for (JobHistoryParser.TaskAttemptInfo attempt : + task.getAllTaskAttempts().values()) { + if (taskType.equals(task.getTaskType())){ + taskList.setLength(0); + taskList.append(attempt.getAttemptId()).append("\t"); + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getStartTime(), 0)).append("\t"); + if (TaskType.REDUCE.equals(taskType)) { + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getShuffleFinishTime(), + attempt.getStartTime())); + taskList.append("\t"); + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getSortFinishTime(), + attempt.getShuffleFinishTime())); + } + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getFinishTime(), + attempt.getStartTime())); + taskList.append("\t"); + taskList.append(attempt.getHostname()).append("\t"); + taskList.append(attempt.getError()); + String taskLogsUrl = HistoryViewer.getTaskLogsUrl(scheme, attempt); + taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a"); + ps.println(taskList); + } + } + } + } + + private void printTaskSummary(PrintStream ps) { + HistoryViewer.SummarizedJob ts = new HistoryViewer.SummarizedJob(job); + StringBuilder taskSummary = new StringBuilder(); + taskSummary.append("\nTask Summary"); + taskSummary.append("\n============================"); + taskSummary.append("\nKind\tTotal\t"); + taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime"); + taskSummary.append("\n"); + taskSummary.append("\nSetup\t").append(ts.totalSetups); + taskSummary.append("\t").append(ts.numFinishedSetups); + taskSummary.append("\t\t").append(ts.numFailedSetups); + taskSummary.append("\t").append(ts.numKilledSetups); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.setupStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.setupFinished, ts.setupStarted)); + taskSummary.append("\nMap\t").append(ts.totalMaps); + taskSummary.append("\t").append(job.getFinishedMaps()); + taskSummary.append("\t\t").append(ts.numFailedMaps); + taskSummary.append("\t").append(ts.numKilledMaps); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.mapStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.mapFinished, ts.mapStarted)); + taskSummary.append("\nReduce\t").append(ts.totalReduces); + taskSummary.append("\t").append(job.getFinishedReduces()); + taskSummary.append("\t\t").append(ts.numFailedReduces); + taskSummary.append("\t").append(ts.numKilledReduces); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.reduceStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.reduceFinished, ts.reduceStarted)); + taskSummary.append("\nCleanup\t").append(ts.totalCleanups); + taskSummary.append("\t").append(ts.numFinishedCleanups); + taskSummary.append("\t\t").append(ts.numFailedCleanups); + taskSummary.append("\t").append(ts.numKilledCleanups); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.cleanupStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.cleanupFinished, + ts.cleanupStarted)); + taskSummary.append("\n============================\n"); + ps.println(taskSummary); + } + + private void printJobAnalysis(PrintStream ps) { + if (job.getJobStatus().equals( + JobStatus.getJobRunState(JobStatus.SUCCEEDED))) { + HistoryViewer.AnalyzedJob avg = new HistoryViewer.AnalyzedJob(job); + + ps.println("\nAnalysis"); + ps.println("========="); + printAnalysis(ps, avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), + 10); + printLast(ps, avg.getMapTasks(), "map", cFinishMapRed); + + if (avg.getReduceTasks().length > 0) { + printAnalysis(ps, avg.getReduceTasks(), cShuffle, "shuffle", + avg.getAvgShuffleTime(), 10); + printLast(ps, avg.getReduceTasks(), "shuffle", cFinishShuffle); + + printAnalysis(ps, avg.getReduceTasks(), cReduce, "reduce", + avg.getAvgReduceTime(), 10); + printLast(ps, avg.getReduceTasks(), "reduce", cFinishMapRed); + } + ps.println("========="); + } else { + ps.println("No Analysis available as job did not finish"); + } + } + + protected void printAnalysis(PrintStream ps, + JobHistoryParser.TaskAttemptInfo[] tasks, + Comparator cmp, + String taskType, long avg, int showTasks) { + Arrays.sort(tasks, cmp); + JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1]; + StringBuilder details = new StringBuilder(); + details.append("\nTime taken by best performing "); + details.append(taskType).append(" task "); + details.append(min.getAttemptId().getTaskID().toString()).append(": "); + if ("map".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + min.getFinishTime(), + min.getStartTime())); + } else if ("shuffle".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + min.getShuffleFinishTime(), + min.getStartTime())); + } else { + details.append(StringUtils.formatTimeDiff( + min.getFinishTime(), + min.getShuffleFinishTime())); + } + details.append("\nAverage time taken by "); + details.append(taskType).append(" tasks: "); + details.append(StringUtils.formatTimeDiff(avg, 0)); + details.append("\nWorse performing "); + details.append(taskType).append(" tasks: "); + details.append("\nTaskId\t\tTimetaken"); + ps.println(details); + for (int i = 0; i < showTasks && i < tasks.length; i++) { + details.setLength(0); + details.append(tasks[i].getAttemptId().getTaskID()).append(" "); + if ("map".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + tasks[i].getFinishTime(), + tasks[i].getStartTime())); + } else if ("shuffle".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + tasks[i].getShuffleFinishTime(), + tasks[i].getStartTime())); + } else { + details.append(StringUtils.formatTimeDiff( + tasks[i].getFinishTime(), + tasks[i].getShuffleFinishTime())); + } + ps.println(details); + } + } + + protected void printLast(PrintStream ps, + JobHistoryParser.TaskAttemptInfo[] tasks, String taskType, + Comparator cmp) { + Arrays.sort(tasks, cFinishMapRed); + JobHistoryParser.TaskAttemptInfo last = tasks[0]; + StringBuilder lastBuf = new StringBuilder(); + lastBuf.append("The last ").append(taskType); + lastBuf.append(" task ").append(last.getAttemptId().getTaskID()); + Long finishTime; + if ("shuffle".equals(taskType)) { + finishTime = last.getShuffleFinishTime(); + } else { + finishTime = last.getFinishTime(); + } + lastBuf.append(" finished at (relative to the Job launch time): "); + lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + finishTime, job.getLaunchTime())); + ps.println(lastBuf); + } + + private void printTasks(PrintStream ps, TaskType taskType, String status) { + Map tasks = job.getAllTasks(); + StringBuilder header = new StringBuilder(); + header.append("\n").append(status).append(" "); + header.append(taskType).append(" task list for ") + .append(job.getJobId().toString()); + header.append("\nTaskId\t\tStartTime\tFinishTime\tError"); + if (TaskType.MAP.equals(taskType)) { + header.append("\tInputSplits"); + } + header.append("\n===================================================="); + StringBuilder taskList = new StringBuilder(); + for (JobHistoryParser.TaskInfo task : tasks.values()) { + if (taskType.equals(task.getTaskType()) && + (status.equals(task.getTaskStatus()) + || status.equalsIgnoreCase("ALL"))) { + taskList.setLength(0); + taskList.append(task.getTaskId()); + taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, task.getStartTime(), 0)); + taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, task.getFinishTime(), + task.getStartTime())); + taskList.append("\t").append(task.getError()); + if (TaskType.MAP.equals(taskType)) { + taskList.append("\t").append(task.getSplitLocations()); + } + if (taskList != null) { + ps.println(header); + ps.println(taskList); + } + } + } + } + + private void printFailedAttempts(PrintStream ps, + HistoryViewer.FilteredJob filteredJob) { + Map> badNodes = filteredJob.getFilteredMap(); + StringBuilder attempts = new StringBuilder(); + if (badNodes.size() > 0) { + attempts.append("\n").append(filteredJob.getFilter()); + attempts.append(" task attempts by nodes"); + attempts.append("\nHostname\tFailedTasks"); + attempts.append("\n==============================="); + ps.println(attempts); + for (Map.Entry> entry : badNodes.entrySet()) { + String node = entry.getKey(); + Set failedTasks = entry.getValue(); + attempts.setLength(0); + attempts.append(node).append("\t"); + for (TaskID t : failedTasks) { + attempts.append(t).append(", "); + } + ps.println(attempts); + } + } + } + + private static Comparator cMap = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getFinishTime() - t1.getStartTime(); + long l2 = t2.getFinishTime() - t2.getStartTime(); + return Long.compare(l2, l1); + } + }; + + private static Comparator cShuffle = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getShuffleFinishTime() - t1.getStartTime(); + long l2 = t2.getShuffleFinishTime() - t2.getStartTime(); + return Long.compare(l2, l1); + } + }; + + private static Comparator cFinishShuffle = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getShuffleFinishTime(); + long l2 = t2.getShuffleFinishTime(); + return Long.compare(l2, l1); + } + }; + + private static Comparator cFinishMapRed = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getFinishTime(); + long l2 = t2.getFinishTime(); + return Long.compare(l2, l1); + } + }; + + private static Comparator cReduce = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getFinishTime() - t1.getShuffleFinishTime(); + long l2 = t2.getFinishTime() - t2.getShuffleFinishTime(); + return Long.compare(l2, l1); + } + }; +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JSONHistoryViewerPrinter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JSONHistoryViewerPrinter.java new file mode 100644 index 00000000000..456dcf7111e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JSONHistoryViewerPrinter.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.jobhistory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.io.Writer; +import java.util.Iterator; +import java.util.Map; + +/** + * Used by the {@link HistoryViewer} to print job history in a machine-readable + * JSON format. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class JSONHistoryViewerPrinter implements HistoryViewerPrinter { + + private JobHistoryParser.JobInfo job; + private boolean printAll; + private String scheme; + private JSONObject json; + + JSONHistoryViewerPrinter(JobHistoryParser.JobInfo job, boolean printAll, + String scheme) { + this.job = job; + this.printAll = printAll; + this.scheme = scheme; + } + + /** + * Print out the Job History to the given {@link PrintStream} in a + * machine-readable JSON format. + * @param ps the {@link PrintStream} to print to + * @throws IOException when a problem occurs while printing + */ + @Override + public void print(PrintStream ps) throws IOException { + json = new JSONObject(); + + Writer writer = null; + try { + printJobDetails(); + printTaskSummary(); + printTasks(); + + writer = new OutputStreamWriter(ps, "UTF-8"); + json.write(writer); + writer.flush(); + } catch (JSONException je) { + throw new IOException("Failure parsing JSON document: " + je.getMessage(), + je); + } finally { + if (writer != null) { + writer.close(); + } + } + } + + private void printJobDetails() throws JSONException { + json.put("hadoopJob", job.getJobId().toString()); + json.put("user", job.getUsername()); + json.put("jobName", job.getJobname()); + json.put("jobConf", job.getJobConfPath()); + json.put("submittedAt", job.getSubmitTime()); + json.put("launchedAt", job.getLaunchTime()); + json.put("finishedAt", job.getFinishTime()); + json.put("status", ((job.getJobStatus() == null) ? + "Incomplete" :job.getJobStatus())); + printJobCounters(job.getTotalCounters(), job.getMapCounters(), + job.getReduceCounters()); + } + + private void printJobCounters(Counters totalCounters, Counters mapCounters, + Counters reduceCounters) throws JSONException { + // Killed jobs might not have counters + if (totalCounters != null) { + JSONObject jGroups = new JSONObject(); + for (String groupName : totalCounters.getGroupNames()) { + CounterGroup totalGroup = totalCounters.getGroup(groupName); + CounterGroup mapGroup = mapCounters.getGroup(groupName); + CounterGroup reduceGroup = reduceCounters.getGroup(groupName); + + Iterator ctrItr = + totalGroup.iterator(); + JSONArray jGroup = new JSONArray(); + while (ctrItr.hasNext()) { + JSONObject jCounter = new JSONObject(); + org.apache.hadoop.mapreduce.Counter counter = ctrItr.next(); + String name = counter.getName(); + long mapValue = mapGroup.findCounter(name).getValue(); + long reduceValue = reduceGroup.findCounter(name).getValue(); + long totalValue = counter.getValue(); + + jCounter.put("counterName", name); + jCounter.put("mapValue", mapValue); + jCounter.put("reduceValue", reduceValue); + jCounter.put("totalValue", totalValue); + jGroup.put(jCounter); + } + jGroups.put(fixGroupNameForShuffleErrors(totalGroup.getName()), jGroup); + } + json.put("counters", jGroups); + } + } + + private void printTaskSummary() throws JSONException { + HistoryViewer.SummarizedJob ts = new HistoryViewer.SummarizedJob(job); + JSONObject jSums = new JSONObject(); + JSONObject jSumSetup = new JSONObject(); + jSumSetup.put("total", ts.totalSetups); + jSumSetup.put("successful", ts.numFinishedSetups); + jSumSetup.put("failed", ts.numFailedSetups); + jSumSetup.put("killed", ts.numKilledSetups); + jSumSetup.put("startTime", ts.setupStarted); + jSumSetup.put("finishTime", ts.setupFinished); + jSums.put("setup", jSumSetup); + JSONObject jSumMap = new JSONObject(); + jSumMap.put("total", ts.totalMaps); + jSumMap.put("successful", job.getFinishedMaps()); + jSumMap.put("failed", ts.numFailedMaps); + jSumMap.put("killed", ts.numKilledMaps); + jSumMap.put("startTime", ts.mapStarted); + jSumMap.put("finishTime", ts.mapFinished); + jSums.put("map", jSumMap); + JSONObject jSumReduce = new JSONObject(); + jSumReduce.put("total", ts.totalReduces); + jSumReduce.put("successful", job.getFinishedReduces()); + jSumReduce.put("failed", ts.numFailedReduces); + jSumReduce.put("killed", ts.numKilledReduces); + jSumReduce.put("startTime", ts.reduceStarted); + jSumReduce.put("finishTime", ts.reduceFinished); + jSums.put("reduce", jSumReduce); + JSONObject jSumCleanup = new JSONObject(); + jSumCleanup.put("total", ts.totalCleanups); + jSumCleanup.put("successful", ts.numFinishedCleanups); + jSumCleanup.put("failed", ts.numFailedCleanups); + jSumCleanup.put("killed", ts.numKilledCleanups); + jSumCleanup.put("startTime", ts.cleanupStarted); + jSumCleanup.put("finishTime", ts.cleanupFinished); + jSums.put("cleanup", jSumCleanup); + json.put("taskSummary", jSums); + } + + private void printTasks() throws JSONException { + Map tasks = job.getAllTasks(); + JSONArray jTasks = new JSONArray(); + for (JobHistoryParser.TaskInfo task : tasks.values()) { + if (!task.getTaskType().equals(TaskType.TASK_CLEANUP) && + ((printAll && task.getTaskStatus().equals( + TaskStatus.State.SUCCEEDED.toString())) + || task.getTaskStatus().equals(TaskStatus.State.KILLED.toString()) + || task.getTaskStatus().equals(TaskStatus.State.FAILED.toString()))) { + JSONObject jTask = new JSONObject(); + jTask.put("taskId", task.getTaskId().toString()); + jTask.put("type", task.getTaskType().toString()); + jTask.put("status", task.getTaskStatus()); + jTask.put("startTime", task.getStartTime()); + jTask.put("finishTime", task.getFinishTime()); + if (!task.getError().isEmpty()) { + jTask.put("error", task.getError()); + } + if (task.getTaskType().equals(TaskType.MAP)) { + jTask.put("inputSplits", task.getSplitLocations()); + } + if (printAll) { + printTaskCounters(jTask, task.getCounters()); + JSONObject jAtt = new JSONObject(); + for (JobHistoryParser.TaskAttemptInfo attempt : + task.getAllTaskAttempts().values()) { + jAtt.put("attemptId", attempt.getAttemptId()); + jAtt.put("startTime", attempt.getStartTime()); + if (task.getTaskType().equals(TaskType.REDUCE)) { + jAtt.put("shuffleFinished", attempt.getShuffleFinishTime()); + jAtt.put("sortFinished", attempt.getSortFinishTime()); + } + jAtt.put("finishTime", attempt.getFinishTime()); + jAtt.put("hostName", attempt.getHostname()); + if (!attempt.getError().isEmpty()) { + jAtt.put("error", task.getError()); + } + String taskLogsUrl = HistoryViewer.getTaskLogsUrl(scheme, attempt); + if (taskLogsUrl != null) { + jAtt.put("taskLogs", taskLogsUrl); + } + } + jTask.put("attempts", jAtt); + } + jTasks.put(jTask); + } + json.put("tasks", jTasks); + } + } + + private void printTaskCounters(JSONObject jTask, Counters taskCounters) + throws JSONException { + // Killed tasks might not have counters + if (taskCounters != null) { + JSONObject jGroups = new JSONObject(); + for (String groupName : taskCounters.getGroupNames()) { + CounterGroup group = taskCounters.getGroup(groupName); + + Iterator ctrItr = group.iterator(); + JSONArray jGroup = new JSONArray(); + while (ctrItr.hasNext()) { + JSONObject jCounter = new JSONObject(); + org.apache.hadoop.mapreduce.Counter counter = ctrItr.next(); + + jCounter.put("counterName", counter.getName()); + jCounter.put("value", counter.getValue()); + jGroup.put(jCounter); + } + jGroups.put(fixGroupNameForShuffleErrors(group.getName()), jGroup); + } + jTask.put("counters", jGroups); + } + } + + private String fixGroupNameForShuffleErrors(String name) { + String retName = name; + + if (name.equals("Shuffle Errors")) { + retName = "org.apache.hadoop.mapreduce.task.reduce.Fetcher.ShuffleErrors"; + } + + return retName; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index db87d9dda17..ea9c1c0a32f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.mapreduce.tools; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; +import java.io.PrintStream; import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; @@ -92,6 +96,8 @@ public class CLI extends Configured implements Tool { String jobid = null; String taskid = null; String historyFile = null; + String historyOutFile = null; + String historyOutFormat = HistoryViewer.HUMAN_FORMAT; String counterGroupName = null; String counterName = null; JobPriority jp = null; @@ -173,16 +179,51 @@ public class CLI extends Configured implements Tool { nEvents = Integer.parseInt(argv[3]); listEvents = true; } else if ("-history".equals(cmd)) { - if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) { - displayUsage(cmd); - return exitCode; - } viewHistory = true; - if (argv.length == 3 && "all".equals(argv[1])) { + if (argv.length < 2 || argv.length > 7) { + displayUsage(cmd); + return exitCode; + } + + // Some arguments are optional while others are not, and some require + // second arguments. Due to this, the indexing can vary depending on + // what's specified and what's left out, as summarized in the below table: + // [all] [-outfile ] [-format ] + // 1 2 3 4 5 6 + // 1 2 3 4 + // 1 2 3 4 + // 1 2 + // 1 2 3 4 5 + // 1 2 3 + // 1 2 3 + // 1 + + // "all" is optional, but comes first if specified + int index = 1; + if ("all".equals(argv[index])) { + index++; viewAllHistory = true; - historyFile = argv[2]; - } else { - historyFile = argv[1]; + if (argv.length == 2) { + displayUsage(cmd); + return exitCode; + } + } + // Get the job history file argument + historyFile = argv[index++]; + // "-outfile" is optional, but if specified requires a second argument + if (argv.length > index + 1 && "-outfile".equals(argv[index])) { + index++; + historyOutFile = argv[index++]; + } + // "-format" is optional, but if specified required a second argument + if (argv.length > index + 1 && "-format".equals(argv[index])) { + index++; + historyOutFormat = argv[index++]; + } + // Check for any extra arguments that don't belong here + if (argv.length > index) { + displayUsage(cmd); + return exitCode; } } else if ("-list".equals(cmd)) { if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { @@ -338,7 +379,8 @@ public class CLI extends Configured implements Tool { exitCode = 0; } } else if (viewHistory) { - viewHistory(historyFile, viewAllHistory); + viewHistory(historyFile, viewAllHistory, historyOutFile, + historyOutFormat); exitCode = 0; } else if (listEvents) { listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents); @@ -451,7 +493,8 @@ public class CLI extends Configured implements Tool { System.err.println(prefix + "[" + cmd + " <#-of-events>]. Event #s start from 1."); } else if ("-history".equals(cmd)) { - System.err.println(prefix + "[" + cmd + " ]"); + System.err.println(prefix + "[" + cmd + + " [all] [-outfile ] [-format ]]"); } else if ("-list".equals(cmd)) { System.err.println(prefix + "[" + cmd + " [all]]"); } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { @@ -484,7 +527,8 @@ public class CLI extends Configured implements Tool { "Valid values for priorities are: " + jobPriorityValues + ". In addition to this, integers also can be used." + "%n"); System.err.printf("\t[-events <#-of-events>]%n"); - System.err.printf("\t[-history ]%n"); + System.err.printf("\t[-history [all] [-outfile ]" + + " [-format ]]%n"); System.err.printf("\t[-list [all]]%n"); System.err.printf("\t[-list-active-trackers]%n"); System.err.printf("\t[-list-blacklisted-trackers]%n"); @@ -499,11 +543,16 @@ public class CLI extends Configured implements Tool { } } - private void viewHistory(String historyFile, boolean all) - throws IOException { + private void viewHistory(String historyFile, boolean all, + String historyOutFile, String format) throws IOException { HistoryViewer historyViewer = new HistoryViewer(historyFile, - getConf(), all); - historyViewer.print(); + getConf(), all, format); + PrintStream ps = System.out; + if (historyOutFile != null) { + ps = new PrintStream(new BufferedOutputStream(new FileOutputStream( + new File(historyOutFile))), true, "UTF-8"); + } + historyViewer.print(ps); } protected long getCounter(Counters counters, String counterGroupName, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md index 7d6f5ffefc9..dda2ddef913 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md @@ -81,7 +81,7 @@ Copy file or directories recursively. More information can be found at Command to interact with Map Reduce Jobs. -Usage: `mapred job | [GENERIC_OPTIONS] | [-submit ] | [-status ] | [-counter ] | [-kill ] | [-events <#-of-events>] | [-history [all] ] | [-list [all]] | [-kill-task ] | [-fail-task ] | [-set-priority ] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids ] [-logs ]` +Usage: `mapred job | [GENERIC_OPTIONS] | [-submit ] | [-status ] | [-counter ] | [-kill ] | [-events <#-of-events>] | [-history [all] [-outfile ] [-format ]] | [-list [all]] | [-kill-task ] | [-fail-task ] | [-set-priority ] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids ] [-logs ]` | COMMAND\_OPTION | Description | |:---- |:---- | @@ -90,7 +90,7 @@ Usage: `mapred job | [GENERIC_OPTIONS] | [-submit ] | [-status (); + addTaskInfo(job, TaskType.JOB_SETUP, 1, TaskStatus.State.SUCCEEDED); + addTaskInfo(job, TaskType.MAP, 2, TaskStatus.State.FAILED); + addTaskInfo(job, TaskType.MAP, 3, TaskStatus.State.SUCCEEDED); + addTaskInfo(job, TaskType.MAP, 4, TaskStatus.State.SUCCEEDED); + addTaskInfo(job, TaskType.MAP, 5, TaskStatus.State.SUCCEEDED); + addTaskInfo(job, TaskType.MAP, 6, TaskStatus.State.SUCCEEDED); + addTaskInfo(job, TaskType.MAP, 7, TaskStatus.State.SUCCEEDED); + addTaskInfo(job, TaskType.REDUCE, 8, TaskStatus.State.SUCCEEDED); + addTaskInfo(job, TaskType.JOB_CLEANUP, 9, TaskStatus.State.SUCCEEDED); + return job; + } + + private static Counters createCounters() { + Counters counters = new Counters(); + counters.findCounter("group1", "counter1").setValue(5); + counters.findCounter("group1", "counter2").setValue(10); + counters.findCounter("group2", "counter1").setValue(15); + return counters; + } + + private static void addTaskInfo(JobHistoryParser.JobInfo job, + TaskType type, int id, TaskStatus.State status) { + JobHistoryParser.TaskInfo task = new JobHistoryParser.TaskInfo(); + task.taskId = new TaskID(job.getJobId(), type, id); + task.startTime = job.getLaunchTime() + id * 1000; + task.finishTime = task.startTime + id * 1000; + task.taskType = type; + task.counters = createCounters(); + task.status = status.name(); + task.attemptsMap = new HashMap<>(); + addTaskAttemptInfo(task, 1); + job.tasksMap.put(task.getTaskId(), task); + } + + private static void addTaskAttemptInfo( + JobHistoryParser.TaskInfo task, int id) { + JobHistoryParser.TaskAttemptInfo attempt = + new JobHistoryParser.TaskAttemptInfo(); + attempt.attemptId = new TaskAttemptID( + TaskID.downgrade(task.getTaskId()), id); + attempt.startTime = task.getStartTime(); + attempt.finishTime = task.getFinishTime(); + attempt.shuffleFinishTime = task.getFinishTime(); + attempt.sortFinishTime = task.getFinishTime(); + attempt.mapFinishTime = task.getFinishTime(); + attempt.status = task.getTaskStatus(); + attempt.taskType = task.getTaskType(); + attempt.trackerName = "localhost"; + attempt.httpPort = 1234; + attempt.hostname = "localhost"; + task.attemptsMap.put(attempt.getAttemptId(), attempt); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java index 23822390355..f13c1635df0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; @@ -29,6 +30,8 @@ import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.PrintStream; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -359,27 +362,152 @@ public class TestMRJobClient extends ClusterMapReduceTestCase { String historyFileUri = new Path(f.getAbsolutePath()) .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toUri() .toString(); - - // bad command - int exitCode = runTool(conf, jc, new String[] { "-history", "pul", - historyFileUri }, out); - assertEquals("Exit code", -1, exitCode); - exitCode = runTool(conf, jc, new String[] { "-history", "all", - historyFileUri }, out); + // Try a bunch of different valid combinations of the command to test + // argument parsing + int exitCode = runTool(conf, jc, new String[] { + "-history", + "all", + historyFileUri, + }, out); assertEquals("Exit code", 0, exitCode); - String line; + checkHistoryHumanOutput(out); + File outFile = File.createTempFile("myout", ".txt"); + exitCode = runTool(conf, jc, new String[] { + "-history", + "all", + historyFileUri, + "-outfile", + outFile.getAbsolutePath() + }, out); + assertEquals("Exit code", 0, exitCode); + checkHistoryHumanFileOutput(out, outFile); + outFile = File.createTempFile("myout", ".txt"); + exitCode = runTool(conf, jc, new String[] { + "-history", + "all", + historyFileUri, + "-outfile", + outFile.getAbsolutePath(), + "-format", + "human" + }, out); + assertEquals("Exit code", 0, exitCode); + checkHistoryHumanFileOutput(out, outFile); + + exitCode = runTool(conf, jc, new String[] { + "-history", + historyFileUri, + "-format", + "human" + }, out); + assertEquals("Exit code", 0, exitCode); + checkHistoryHumanOutput(out); + + exitCode = runTool(conf, jc, new String[] { + "-history", + "all", + historyFileUri, + "-format", + "json" + }, out); + assertEquals("Exit code", 0, exitCode); + checkHistoryJSONOutput(out); + outFile = File.createTempFile("myout", ".txt"); + exitCode = runTool(conf, jc, new String[] { + "-history", + "all", + historyFileUri, + "-outfile", + outFile.getAbsolutePath(), + "-format", + "json" + }, out); + assertEquals("Exit code", 0, exitCode); + checkHistoryJSONFileOutput(out, outFile); + exitCode = runTool(conf, jc, new String[] { + "-history", + historyFileUri, + "-format", + "json" + }, out); + assertEquals("Exit code", 0, exitCode); + checkHistoryJSONOutput(out); + + // Check some bad arguments + exitCode = runTool(conf, jc, new String[] { + "-history", + historyFileUri, + "foo" + }, out); + assertEquals("Exit code", -1, exitCode); + exitCode = runTool(conf, jc, new String[] { + "-history", + historyFileUri, + "-format" + }, out); + assertEquals("Exit code", -1, exitCode); + runTool(conf, jc, new String[] { + "-history", + historyFileUri, + "-outfile", + }, out); + assertEquals("Exit code", -1, exitCode); + try { + runTool(conf, jc, new String[]{ + "-history", + historyFileUri, + "-format", + "foo" + }, out); + fail(); + } catch (IllegalArgumentException e) { + // Expected + } + } + + private void checkHistoryHumanOutput(ByteArrayOutputStream out) + throws IOException, JSONException { BufferedReader br = new BufferedReader(new InputStreamReader( new ByteArrayInputStream(out.toByteArray()))); - int counter = 0; - while ((line = br.readLine()) != null) { - LOG.info("line = " + line); - if (line.startsWith("task_")) { - counter++; - } - } - assertEquals(23, counter); + br.readLine(); + String line = br.readLine(); + br.close(); + assertEquals("Hadoop job: job_1329348432655_0001", line); + out.reset(); } + + private void checkHistoryJSONOutput(ByteArrayOutputStream out) + throws IOException, JSONException { + BufferedReader br = new BufferedReader(new InputStreamReader( + new ByteArrayInputStream(out.toByteArray()))); + String line = org.apache.commons.io.IOUtils.toString(br); + br.close(); + JSONObject json = new JSONObject(line); + assertEquals("job_1329348432655_0001", json.getString("hadoopJob")); + out.reset(); + } + + private void checkHistoryHumanFileOutput(ByteArrayOutputStream out, + File outFile) throws IOException, JSONException { + BufferedReader br = new BufferedReader(new FileReader(outFile)); + br.readLine(); + String line = br.readLine(); + br.close(); + assertEquals("Hadoop job: job_1329348432655_0001", line); + assertEquals(0, out.size()); + } + + private void checkHistoryJSONFileOutput(ByteArrayOutputStream out, + File outFile) throws IOException, JSONException { + BufferedReader br = new BufferedReader(new FileReader(outFile)); + String line = org.apache.commons.io.IOUtils.toString(br); + br.close(); + JSONObject json = new JSONObject(line); + assertEquals("job_1329348432655_0001", json.getString("hadoopJob")); + assertEquals(0, out.size()); + } + /** * print job events list */ diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 607857343e0..f23b46ed81a 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -992,6 +992,12 @@ + + org.skyscreamer + jsonassert + 1.3.0 + +