MAPREDUCE-6627. Add machine-readable output to mapred job -history command (rkanter)

This commit is contained in:
Robert Kanter 2016-02-18 17:58:26 -08:00
parent 8ab7658025
commit 8eee59ce6b
11 changed files with 1986 additions and 462 deletions

View File

@ -313,6 +313,9 @@ Release 2.9.0 - UNRELEASED
MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
mapreduce. (Sidharta Seethana via vvasudev)
MAPREDUCE-6627. Add machine-readable output to mapred job -history
command (rkanter)
OPTIMIZATIONS
BUG FIXES

View File

@ -42,6 +42,11 @@
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -19,13 +19,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@ -37,46 +32,52 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.util.HostUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
/**
* HistoryViewer is used to parse and view the JobHistory files
*
* HistoryViewer is used to parse and view the JobHistory files. They can be
* printed in human-readable format or machine-readable JSON format using the
* {@link HistoryViewerPrinter}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HistoryViewer {
private static final Log LOG = LogFactory.getLog(HistoryViewer.class);
private static final SimpleDateFormat dateFormat =
new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
private FileSystem fs;
private JobInfo job;
private String jobId;
private boolean printAll;
private HistoryViewerPrinter jhvp;
public static final String HUMAN_FORMAT = "human";
public static final String JSON_FORMAT = "json";
/**
* Constructs the HistoryViewer object
* @param historyFile The fully qualified Path of the History File
* @param conf The Configuration file
* @param printAll Toggle to print all status to only killed/failed status
* @throws IOException
*/
public HistoryViewer(String historyFile,
Configuration conf,
/**
* Constructs the HistoryViewer object.
* @param historyFile the fully qualified Path of the History File
* @param conf the Configuration file
* @param printAll toggle to print all status to only killed/failed status
* @throws IOException when there is a problem parsing the history file
*/
public HistoryViewer(String historyFile, Configuration conf,
boolean printAll) throws IOException {
this.printAll = printAll;
this(historyFile, conf, printAll, HUMAN_FORMAT);
}
/**
* Constructs the HistoryViewer object.
* @param historyFile the fully qualified Path of the History File
* @param conf the Configuration file
* @param printAll toggle to print all status to only killed/failed status
* @param format the output format to use
* @throws IOException when there is a problem parsing the history file
*/
public HistoryViewer(String historyFile, Configuration conf, boolean printAll,
String format) throws IOException {
String errorMsg = "Unable to initialize History Viewer";
try {
Path jobFile = new Path(historyFile);
@ -101,366 +102,41 @@ public class HistoryViewer {
}
JobHistoryParser parser = new JobHistoryParser(fs, jobFile);
job = parser.parse();
jobId = job.getJobId().toString();
} catch(Exception e) {
String scheme = WebAppUtils.getHttpSchemePrefix(fs.getConf());
if (HUMAN_FORMAT.equalsIgnoreCase(format)) {
jhvp = new HumanReadableHistoryViewerPrinter(job, printAll, scheme);
} else if (JSON_FORMAT.equalsIgnoreCase(format)) {
jhvp = new JSONHistoryViewerPrinter(job, printAll, scheme);
} else {
System.err.println("Invalid format specified: " + format);
throw new IllegalArgumentException(errorMsg);
}
} catch(IOException e) {
throw new IOException(errorMsg, e);
}
}
/**
* Print the job/task/attempt summary information
* @throws IOException
* Print the job/task/attempt summary information to stdout.
* @throws IOException when there is a problem printing the history
*/
public void print() throws IOException{
printJobDetails();
printTaskSummary();
printJobAnalysis();
printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_CLEANUP,
JobStatus.getJobRunState(JobStatus.KILLED));
if (printAll) {
printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
printAllTaskAttempts(TaskType.JOB_SETUP);
printAllTaskAttempts(TaskType.MAP);
printAllTaskAttempts(TaskType.REDUCE);
printAllTaskAttempts(TaskType.JOB_CLEANUP);
}
FilteredJob filter = new FilteredJob(job,
TaskStatus.State.FAILED.toString());
printFailedAttempts(filter);
filter = new FilteredJob(job,
TaskStatus.State.KILLED.toString());
printFailedAttempts(filter);
}
private void printJobDetails() {
StringBuffer jobDetails = new StringBuffer();
jobDetails.append("\nHadoop job: " ).append(job.getJobId());
jobDetails.append("\n=====================================");
jobDetails.append("\nUser: ").append(job.getUsername());
jobDetails.append("\nJobName: ").append(job.getJobname());
jobDetails.append("\nJobConf: ").append(job.getJobConfPath());
jobDetails.append("\nSubmitted At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getSubmitTime(), 0));
jobDetails.append("\nLaunched At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getLaunchTime(),
job.getSubmitTime()));
jobDetails.append("\nFinished At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getFinishTime(),
job.getLaunchTime()));
jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ?
"Incomplete" :job.getJobStatus()));
printCounters(jobDetails, job.getTotalCounters(), job.getMapCounters(),
job.getReduceCounters());
jobDetails.append("\n");
jobDetails.append("\n=====================================");
System.out.println(jobDetails.toString());
}
private void printCounters(StringBuffer buff, Counters totalCounters,
Counters mapCounters, Counters reduceCounters) {
// Killed jobs might not have counters
if (totalCounters == null) {
return;
}
buff.append("\nCounters: \n\n");
buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
"Group Name",
"Counter name",
"Map Value",
"Reduce Value",
"Total Value"));
buff.append("\n------------------------------------------"+
"---------------------------------------------");
for (String groupName : totalCounters.getGroupNames()) {
CounterGroup totalGroup = totalCounters.getGroup(groupName);
CounterGroup mapGroup = mapCounters.getGroup(groupName);
CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
Format decimal = new DecimalFormat();
Iterator<org.apache.hadoop.mapreduce.Counter> ctrItr =
totalGroup.iterator();
while(ctrItr.hasNext()) {
org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
String name = counter.getName();
String mapValue =
decimal.format(mapGroup.findCounter(name).getValue());
String reduceValue =
decimal.format(reduceGroup.findCounter(name).getValue());
String totalValue =
decimal.format(counter.getValue());
buff.append(
String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
totalGroup.getDisplayName(),
counter.getDisplayName(),
mapValue, reduceValue, totalValue));
}
}
}
private void printAllTaskAttempts(TaskType taskType) {
Map<TaskID, TaskInfo> tasks = job.getAllTasks();
StringBuffer taskList = new StringBuffer();
taskList.append("\n").append(taskType);
taskList.append(" task list for ").append(job.getJobId());
taskList.append("\nTaskId\t\tStartTime");
if (TaskType.REDUCE.equals(taskType)) {
taskList.append("\tShuffleFinished\tSortFinished");
}
taskList.append("\tFinishTime\tHostName\tError\tTaskLogs");
taskList.append("\n====================================================");
System.out.println(taskList.toString());
for (JobHistoryParser.TaskInfo task : tasks.values()) {
for (JobHistoryParser.TaskAttemptInfo attempt :
task.getAllTaskAttempts().values()) {
if (taskType.equals(task.getTaskType())){
taskList.setLength(0);
taskList.append(attempt.getAttemptId()).append("\t");
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getStartTime(), 0)).append("\t");
if (TaskType.REDUCE.equals(taskType)) {
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getShuffleFinishTime(),
attempt.getStartTime()));
taskList.append("\t");
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getSortFinishTime(),
attempt.getShuffleFinishTime()));
}
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getFinishTime(),
attempt.getStartTime()));
taskList.append("\t");
taskList.append(attempt.getHostname()).append("\t");
taskList.append(attempt.getError());
String taskLogsUrl = getTaskLogsUrl(
WebAppUtils.getHttpSchemePrefix(fs.getConf()), attempt);
taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a");
System.out.println(taskList.toString());
}
}
}
}
private void printTaskSummary() {
SummarizedJob ts = new SummarizedJob(job);
StringBuffer taskSummary = new StringBuffer();
taskSummary.append("\nTask Summary");
taskSummary.append("\n============================");
taskSummary.append("\nKind\tTotal\t");
taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
taskSummary.append("\n");
taskSummary.append("\nSetup\t").append(ts.totalSetups);
taskSummary.append("\t").append(ts.numFinishedSetups);
taskSummary.append("\t\t").append(ts.numFailedSetups);
taskSummary.append("\t").append(ts.numKilledSetups);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.setupStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.setupFinished, ts.setupStarted));
taskSummary.append("\nMap\t").append(ts.totalMaps);
taskSummary.append("\t").append(job.getFinishedMaps());
taskSummary.append("\t\t").append(ts.numFailedMaps);
taskSummary.append("\t").append(ts.numKilledMaps);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.mapStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.mapFinished, ts.mapStarted));
taskSummary.append("\nReduce\t").append(ts.totalReduces);
taskSummary.append("\t").append(job.getFinishedReduces());
taskSummary.append("\t\t").append(ts.numFailedReduces);
taskSummary.append("\t").append(ts.numKilledReduces);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.reduceStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.reduceFinished, ts.reduceStarted));
taskSummary.append("\nCleanup\t").append(ts.totalCleanups);
taskSummary.append("\t").append(ts.numFinishedCleanups);
taskSummary.append("\t\t").append(ts.numFailedCleanups);
taskSummary.append("\t").append(ts.numKilledCleanups);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.cleanupStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.cleanupFinished,
ts.cleanupStarted));
taskSummary.append("\n============================\n");
System.out.println(taskSummary.toString());
}
private void printJobAnalysis() {
if (!job.getJobStatus().equals
(JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
System.out.println("No Analysis available as job did not finish");
return;
}
AnalyzedJob avg = new AnalyzedJob(job);
System.out.println("\nAnalysis");
System.out.println("=========");
printAnalysis(avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), 10);
printLast(avg.getMapTasks(), "map", cFinishMapRed);
if (avg.getReduceTasks().length > 0) {
printAnalysis(avg.getReduceTasks(), cShuffle, "shuffle",
avg.getAvgShuffleTime(), 10);
printLast(avg.getReduceTasks(), "shuffle", cFinishShuffle);
printAnalysis(avg.getReduceTasks(), cReduce, "reduce",
avg.getAvgReduceTime(), 10);
printLast(avg.getReduceTasks(), "reduce", cFinishMapRed);
}
System.out.println("=========");
}
private void printAnalysis(JobHistoryParser.TaskAttemptInfo [] tasks,
Comparator<JobHistoryParser.TaskAttemptInfo> cmp,
String taskType,
long avg,
int showTasks) {
Arrays.sort(tasks, cmp);
JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1];
StringBuffer details = new StringBuffer();
details.append("\nTime taken by best performing ");
details.append(taskType).append(" task ");
details.append(min.getAttemptId().getTaskID().toString()).append(": ");
if ("map".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
min.getFinishTime(),
min.getStartTime()));
} else if ("shuffle".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
min.getShuffleFinishTime(),
min.getStartTime()));
} else {
details.append(StringUtils.formatTimeDiff(
min.getFinishTime(),
min.getShuffleFinishTime()));
}
details.append("\nAverage time taken by ");
details.append(taskType).append(" tasks: ");
details.append(StringUtils.formatTimeDiff(avg, 0));
details.append("\nWorse performing ");
details.append(taskType).append(" tasks: ");
details.append("\nTaskId\t\tTimetaken");
System.out.println(details.toString());
for (int i = 0; i < showTasks && i < tasks.length; i++) {
details.setLength(0);
details.append(tasks[i].getAttemptId().getTaskID()).append(" ");
if ("map".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
tasks[i].getFinishTime(),
tasks[i].getStartTime()));
} else if ("shuffle".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
tasks[i].getShuffleFinishTime(),
tasks[i].getStartTime()));
} else {
details.append(StringUtils.formatTimeDiff(
tasks[i].getFinishTime(),
tasks[i].getShuffleFinishTime()));
}
System.out.println(details.toString());
}
}
private void printLast(JobHistoryParser.TaskAttemptInfo [] tasks,
String taskType,
Comparator<JobHistoryParser.TaskAttemptInfo> cmp
) {
Arrays.sort(tasks, cFinishMapRed);
JobHistoryParser.TaskAttemptInfo last = tasks[0];
StringBuffer lastBuf = new StringBuffer();
lastBuf.append("The last ").append(taskType);
lastBuf.append(" task ").append(last.getAttemptId().getTaskID());
Long finishTime;
if ("shuffle".equals(taskType)) {
finishTime = last.getShuffleFinishTime();
} else {
finishTime = last.getFinishTime();
}
lastBuf.append(" finished at (relative to the Job launch time): ");
lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
finishTime, job.getLaunchTime()));
System.out.println(lastBuf.toString());
}
private void printTasks(TaskType taskType, String status) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
StringBuffer header = new StringBuffer();
header.append("\n").append(status).append(" ");
header.append(taskType).append(" task list for ").append(jobId);
header.append("\nTaskId\t\tStartTime\tFinishTime\tError");
if (TaskType.MAP.equals(taskType)) {
header.append("\tInputSplits");
}
header.append("\n====================================================");
StringBuffer taskList = new StringBuffer();
for (JobHistoryParser.TaskInfo task : tasks.values()) {
if (taskType.equals(task.getTaskType()) &&
(status.equals(task.getTaskStatus())
|| status.equalsIgnoreCase("ALL"))) {
taskList.setLength(0);
taskList.append(task.getTaskId());
taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, task.getStartTime(), 0));
taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, task.getFinishTime(),
task.getStartTime()));
taskList.append("\t").append(task.getError());
if (TaskType.MAP.equals(taskType)) {
taskList.append("\t").append(task.getSplitLocations());
}
if (taskList != null) {
System.out.println(header.toString());
System.out.println(taskList.toString());
}
}
}
}
private void printFailedAttempts(FilteredJob filteredJob) {
Map<String, Set<TaskID>> badNodes = filteredJob.getFilteredMap();
StringBuffer attempts = new StringBuffer();
if (badNodes.size() > 0) {
attempts.append("\n").append(filteredJob.getFilter());
attempts.append(" task attempts by nodes");
attempts.append("\nHostname\tFailedTasks");
attempts.append("\n===============================");
System.out.println(attempts.toString());
for (Map.Entry<String,
Set<TaskID>> entry : badNodes.entrySet()) {
String node = entry.getKey();
Set<TaskID> failedTasks = entry.getValue();
attempts.setLength(0);
attempts.append(node).append("\t");
for (TaskID t : failedTasks) {
attempts.append(t).append(", ");
}
System.out.println(attempts.toString());
}
}
public void print() throws IOException {
print(System.out);
}
/**
* Return the TaskLogsUrl of a particular TaskAttempt
* Print the job/task/attempt summary information to the PrintStream.
* @param ps The PrintStream to print to
* @throws IOException when there is a problem printing the history
*/
public void print(PrintStream ps) throws IOException {
jhvp.print(ps);
}
/**
* Return the TaskLogsUrl of a particular TaskAttempt.
*
* @param attempt
* @param attempt info about the task attempt
* @return the taskLogsUrl. null if http-port or tracker-name or
* task-attempt-id are unavailable.
*/
@ -480,58 +156,6 @@ public class HistoryViewer {
attempt.getAttemptId().toString());
}
private Comparator<JobHistoryParser.TaskAttemptInfo> cMap =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime() - t1.getStartTime();
long l2 = t2.getFinishTime() - t2.getStartTime();
return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
}
};
private Comparator<JobHistoryParser.TaskAttemptInfo> cShuffle =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getShuffleFinishTime() - t1.getStartTime();
long l2 = t2.getShuffleFinishTime() - t2.getStartTime();
return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
}
};
private Comparator<JobHistoryParser.TaskAttemptInfo> cFinishShuffle =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getShuffleFinishTime();
long l2 = t2.getShuffleFinishTime();
return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
}
};
private Comparator<JobHistoryParser.TaskAttemptInfo> cFinishMapRed =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime();
long l2 = t2.getFinishTime();
return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
}
};
private Comparator<JobHistoryParser.TaskAttemptInfo> cReduce =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime() -
t1.getShuffleFinishTime();
long l2 = t2.getFinishTime() -
t2.getShuffleFinishTime();
return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
}
};
/**
* Utility class used the summarize the job.
* Used by HistoryViewer and the JobHistory UI.
@ -694,7 +318,6 @@ public class HistoryViewer {
* Utility class used while analyzing the job.
* Used by HistoryViewer and the JobHistory UI.
*/
public static class AnalyzedJob {
private long avgMapTime;
private long avgReduceTime;
@ -765,7 +388,6 @@ public class HistoryViewer {
/**
* Utility to filter out events based on the task status
*
*/
public static class FilteredJob {

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.io.PrintStream;
/**
* Used by the {@link HistoryViewer} to print job history in different formats.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
interface HistoryViewerPrinter {
/**
* Print out the Job History to the given {@link PrintStream}.
* @param ps the {@link PrintStream} to print to
* @throws IOException when a problem occurs while printing
*/
void print(PrintStream ps) throws IOException;
}

View File

@ -0,0 +1,471 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.io.PrintStream;
import java.text.DecimalFormat;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
/**
* Used by the {@link HistoryViewer} to print job history in a human-readable
* format.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class HumanReadableHistoryViewerPrinter implements HistoryViewerPrinter {
private JobHistoryParser.JobInfo job;
private final SimpleDateFormat dateFormat;
private boolean printAll;
private String scheme;
HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job,
boolean printAll, String scheme) {
this.job = job;
this.printAll = printAll;
this.scheme = scheme;
this.dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
}
HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job,
boolean printAll, String scheme,
TimeZone tz) {
this(job, printAll, scheme);
this.dateFormat.setTimeZone(tz);
}
/**
* Print out the Job History to the given {@link PrintStream} in a
* human-readable format.
* @param ps the {@link PrintStream} to print to
* @throws IOException when a problem occurs while printing
*/
@Override
public void print(PrintStream ps) throws IOException {
printJobDetails(ps);
printTaskSummary(ps);
printJobAnalysis(ps);
printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
printTasks(ps, TaskType.MAP, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.MAP, TaskStatus.State.KILLED.toString());
printTasks(ps, TaskType.REDUCE, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.REDUCE, TaskStatus.State.KILLED.toString());
printTasks(ps, TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.JOB_CLEANUP,
JobStatus.getJobRunState(JobStatus.KILLED));
if (printAll) {
printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
printTasks(ps, TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
printTasks(ps, TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
printTasks(ps, TaskType.JOB_CLEANUP,
TaskStatus.State.SUCCEEDED.toString());
printAllTaskAttempts(ps, TaskType.JOB_SETUP);
printAllTaskAttempts(ps, TaskType.MAP);
printAllTaskAttempts(ps, TaskType.REDUCE);
printAllTaskAttempts(ps, TaskType.JOB_CLEANUP);
}
HistoryViewer.FilteredJob filter = new HistoryViewer.FilteredJob(job,
TaskStatus.State.FAILED.toString());
printFailedAttempts(ps, filter);
filter = new HistoryViewer.FilteredJob(job,
TaskStatus.State.KILLED.toString());
printFailedAttempts(ps, filter);
}
private void printJobDetails(PrintStream ps) {
StringBuilder jobDetails = new StringBuilder();
jobDetails.append("\nHadoop job: ").append(job.getJobId());
jobDetails.append("\n=====================================");
jobDetails.append("\nUser: ").append(job.getUsername());
jobDetails.append("\nJobName: ").append(job.getJobname());
jobDetails.append("\nJobConf: ").append(job.getJobConfPath());
jobDetails.append("\nSubmitted At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getSubmitTime(), 0));
jobDetails.append("\nLaunched At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getLaunchTime(),
job.getSubmitTime()));
jobDetails.append("\nFinished At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getFinishTime(),
job.getLaunchTime()));
jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ?
"Incomplete" :job.getJobStatus()));
printJobCounters(jobDetails, job.getTotalCounters(), job.getMapCounters(),
job.getReduceCounters());
jobDetails.append("\n");
jobDetails.append("\n=====================================");
ps.println(jobDetails);
}
private void printJobCounters(StringBuilder buff, Counters totalCounters,
Counters mapCounters, Counters reduceCounters) {
// Killed jobs might not have counters
if (totalCounters != null) {
buff.append("\nCounters: \n\n");
buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
"Group Name",
"Counter name",
"Map Value",
"Reduce Value",
"Total Value"));
buff.append("\n------------------------------------------" +
"---------------------------------------------");
for (String groupName : totalCounters.getGroupNames()) {
CounterGroup totalGroup = totalCounters.getGroup(groupName);
CounterGroup mapGroup = mapCounters.getGroup(groupName);
CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
Format decimal = new DecimalFormat();
Iterator<Counter> ctrItr =
totalGroup.iterator();
while (ctrItr.hasNext()) {
org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
String name = counter.getName();
String mapValue =
decimal.format(mapGroup.findCounter(name).getValue());
String reduceValue =
decimal.format(reduceGroup.findCounter(name).getValue());
String totalValue =
decimal.format(counter.getValue());
buff.append(
String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
totalGroup.getDisplayName(),
counter.getDisplayName(),
mapValue, reduceValue, totalValue));
}
}
}
}
private void printAllTaskAttempts(PrintStream ps, TaskType taskType) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
StringBuilder taskList = new StringBuilder();
taskList.append("\n").append(taskType);
taskList.append(" task list for ").append(job.getJobId());
taskList.append("\nTaskId\t\tStartTime");
if (TaskType.REDUCE.equals(taskType)) {
taskList.append("\tShuffleFinished\tSortFinished");
}
taskList.append("\tFinishTime\tHostName\tError\tTaskLogs");
taskList.append("\n====================================================");
ps.println(taskList.toString());
for (JobHistoryParser.TaskInfo task : tasks.values()) {
for (JobHistoryParser.TaskAttemptInfo attempt :
task.getAllTaskAttempts().values()) {
if (taskType.equals(task.getTaskType())){
taskList.setLength(0);
taskList.append(attempt.getAttemptId()).append("\t");
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getStartTime(), 0)).append("\t");
if (TaskType.REDUCE.equals(taskType)) {
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getShuffleFinishTime(),
attempt.getStartTime()));
taskList.append("\t");
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getSortFinishTime(),
attempt.getShuffleFinishTime()));
}
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getFinishTime(),
attempt.getStartTime()));
taskList.append("\t");
taskList.append(attempt.getHostname()).append("\t");
taskList.append(attempt.getError());
String taskLogsUrl = HistoryViewer.getTaskLogsUrl(scheme, attempt);
taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a");
ps.println(taskList);
}
}
}
}
private void printTaskSummary(PrintStream ps) {
HistoryViewer.SummarizedJob ts = new HistoryViewer.SummarizedJob(job);
StringBuilder taskSummary = new StringBuilder();
taskSummary.append("\nTask Summary");
taskSummary.append("\n============================");
taskSummary.append("\nKind\tTotal\t");
taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
taskSummary.append("\n");
taskSummary.append("\nSetup\t").append(ts.totalSetups);
taskSummary.append("\t").append(ts.numFinishedSetups);
taskSummary.append("\t\t").append(ts.numFailedSetups);
taskSummary.append("\t").append(ts.numKilledSetups);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.setupStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.setupFinished, ts.setupStarted));
taskSummary.append("\nMap\t").append(ts.totalMaps);
taskSummary.append("\t").append(job.getFinishedMaps());
taskSummary.append("\t\t").append(ts.numFailedMaps);
taskSummary.append("\t").append(ts.numKilledMaps);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.mapStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.mapFinished, ts.mapStarted));
taskSummary.append("\nReduce\t").append(ts.totalReduces);
taskSummary.append("\t").append(job.getFinishedReduces());
taskSummary.append("\t\t").append(ts.numFailedReduces);
taskSummary.append("\t").append(ts.numKilledReduces);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.reduceStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.reduceFinished, ts.reduceStarted));
taskSummary.append("\nCleanup\t").append(ts.totalCleanups);
taskSummary.append("\t").append(ts.numFinishedCleanups);
taskSummary.append("\t\t").append(ts.numFailedCleanups);
taskSummary.append("\t").append(ts.numKilledCleanups);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.cleanupStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.cleanupFinished,
ts.cleanupStarted));
taskSummary.append("\n============================\n");
ps.println(taskSummary);
}
private void printJobAnalysis(PrintStream ps) {
if (job.getJobStatus().equals(
JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
HistoryViewer.AnalyzedJob avg = new HistoryViewer.AnalyzedJob(job);
ps.println("\nAnalysis");
ps.println("=========");
printAnalysis(ps, avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(),
10);
printLast(ps, avg.getMapTasks(), "map", cFinishMapRed);
if (avg.getReduceTasks().length > 0) {
printAnalysis(ps, avg.getReduceTasks(), cShuffle, "shuffle",
avg.getAvgShuffleTime(), 10);
printLast(ps, avg.getReduceTasks(), "shuffle", cFinishShuffle);
printAnalysis(ps, avg.getReduceTasks(), cReduce, "reduce",
avg.getAvgReduceTime(), 10);
printLast(ps, avg.getReduceTasks(), "reduce", cFinishMapRed);
}
ps.println("=========");
} else {
ps.println("No Analysis available as job did not finish");
}
}
protected void printAnalysis(PrintStream ps,
JobHistoryParser.TaskAttemptInfo[] tasks,
Comparator<JobHistoryParser.TaskAttemptInfo> cmp,
String taskType, long avg, int showTasks) {
Arrays.sort(tasks, cmp);
JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1];
StringBuilder details = new StringBuilder();
details.append("\nTime taken by best performing ");
details.append(taskType).append(" task ");
details.append(min.getAttemptId().getTaskID().toString()).append(": ");
if ("map".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
min.getFinishTime(),
min.getStartTime()));
} else if ("shuffle".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
min.getShuffleFinishTime(),
min.getStartTime()));
} else {
details.append(StringUtils.formatTimeDiff(
min.getFinishTime(),
min.getShuffleFinishTime()));
}
details.append("\nAverage time taken by ");
details.append(taskType).append(" tasks: ");
details.append(StringUtils.formatTimeDiff(avg, 0));
details.append("\nWorse performing ");
details.append(taskType).append(" tasks: ");
details.append("\nTaskId\t\tTimetaken");
ps.println(details);
for (int i = 0; i < showTasks && i < tasks.length; i++) {
details.setLength(0);
details.append(tasks[i].getAttemptId().getTaskID()).append(" ");
if ("map".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
tasks[i].getFinishTime(),
tasks[i].getStartTime()));
} else if ("shuffle".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
tasks[i].getShuffleFinishTime(),
tasks[i].getStartTime()));
} else {
details.append(StringUtils.formatTimeDiff(
tasks[i].getFinishTime(),
tasks[i].getShuffleFinishTime()));
}
ps.println(details);
}
}
protected void printLast(PrintStream ps,
JobHistoryParser.TaskAttemptInfo[] tasks, String taskType,
Comparator<JobHistoryParser.TaskAttemptInfo> cmp) {
Arrays.sort(tasks, cFinishMapRed);
JobHistoryParser.TaskAttemptInfo last = tasks[0];
StringBuilder lastBuf = new StringBuilder();
lastBuf.append("The last ").append(taskType);
lastBuf.append(" task ").append(last.getAttemptId().getTaskID());
Long finishTime;
if ("shuffle".equals(taskType)) {
finishTime = last.getShuffleFinishTime();
} else {
finishTime = last.getFinishTime();
}
lastBuf.append(" finished at (relative to the Job launch time): ");
lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
finishTime, job.getLaunchTime()));
ps.println(lastBuf);
}
private void printTasks(PrintStream ps, TaskType taskType, String status) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
StringBuilder header = new StringBuilder();
header.append("\n").append(status).append(" ");
header.append(taskType).append(" task list for ")
.append(job.getJobId().toString());
header.append("\nTaskId\t\tStartTime\tFinishTime\tError");
if (TaskType.MAP.equals(taskType)) {
header.append("\tInputSplits");
}
header.append("\n====================================================");
StringBuilder taskList = new StringBuilder();
for (JobHistoryParser.TaskInfo task : tasks.values()) {
if (taskType.equals(task.getTaskType()) &&
(status.equals(task.getTaskStatus())
|| status.equalsIgnoreCase("ALL"))) {
taskList.setLength(0);
taskList.append(task.getTaskId());
taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, task.getStartTime(), 0));
taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, task.getFinishTime(),
task.getStartTime()));
taskList.append("\t").append(task.getError());
if (TaskType.MAP.equals(taskType)) {
taskList.append("\t").append(task.getSplitLocations());
}
if (taskList != null) {
ps.println(header);
ps.println(taskList);
}
}
}
}
private void printFailedAttempts(PrintStream ps,
HistoryViewer.FilteredJob filteredJob) {
Map<String, Set<TaskID>> badNodes = filteredJob.getFilteredMap();
StringBuilder attempts = new StringBuilder();
if (badNodes.size() > 0) {
attempts.append("\n").append(filteredJob.getFilter());
attempts.append(" task attempts by nodes");
attempts.append("\nHostname\tFailedTasks");
attempts.append("\n===============================");
ps.println(attempts);
for (Map.Entry<String, Set<TaskID>> entry : badNodes.entrySet()) {
String node = entry.getKey();
Set<TaskID> failedTasks = entry.getValue();
attempts.setLength(0);
attempts.append(node).append("\t");
for (TaskID t : failedTasks) {
attempts.append(t).append(", ");
}
ps.println(attempts);
}
}
}
private static Comparator<JobHistoryParser.TaskAttemptInfo> cMap =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime() - t1.getStartTime();
long l2 = t2.getFinishTime() - t2.getStartTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cShuffle =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getShuffleFinishTime() - t1.getStartTime();
long l2 = t2.getShuffleFinishTime() - t2.getStartTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cFinishShuffle =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getShuffleFinishTime();
long l2 = t2.getShuffleFinishTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cFinishMapRed =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime();
long l2 = t2.getFinishTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cReduce =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime() - t1.getShuffleFinishTime();
long l2 = t2.getFinishTime() - t2.getShuffleFinishTime();
return Long.compare(l2, l1);
}
};
}

View File

@ -0,0 +1,256 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.Writer;
import java.util.Iterator;
import java.util.Map;
/**
* Used by the {@link HistoryViewer} to print job history in a machine-readable
* JSON format.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class JSONHistoryViewerPrinter implements HistoryViewerPrinter {
private JobHistoryParser.JobInfo job;
private boolean printAll;
private String scheme;
private JSONObject json;
JSONHistoryViewerPrinter(JobHistoryParser.JobInfo job, boolean printAll,
String scheme) {
this.job = job;
this.printAll = printAll;
this.scheme = scheme;
}
/**
* Print out the Job History to the given {@link PrintStream} in a
* machine-readable JSON format.
* @param ps the {@link PrintStream} to print to
* @throws IOException when a problem occurs while printing
*/
@Override
public void print(PrintStream ps) throws IOException {
json = new JSONObject();
Writer writer = null;
try {
printJobDetails();
printTaskSummary();
printTasks();
writer = new OutputStreamWriter(ps, "UTF-8");
json.write(writer);
writer.flush();
} catch (JSONException je) {
throw new IOException("Failure parsing JSON document: " + je.getMessage(),
je);
} finally {
if (writer != null) {
writer.close();
}
}
}
private void printJobDetails() throws JSONException {
json.put("hadoopJob", job.getJobId().toString());
json.put("user", job.getUsername());
json.put("jobName", job.getJobname());
json.put("jobConf", job.getJobConfPath());
json.put("submittedAt", job.getSubmitTime());
json.put("launchedAt", job.getLaunchTime());
json.put("finishedAt", job.getFinishTime());
json.put("status", ((job.getJobStatus() == null) ?
"Incomplete" :job.getJobStatus()));
printJobCounters(job.getTotalCounters(), job.getMapCounters(),
job.getReduceCounters());
}
private void printJobCounters(Counters totalCounters, Counters mapCounters,
Counters reduceCounters) throws JSONException {
// Killed jobs might not have counters
if (totalCounters != null) {
JSONObject jGroups = new JSONObject();
for (String groupName : totalCounters.getGroupNames()) {
CounterGroup totalGroup = totalCounters.getGroup(groupName);
CounterGroup mapGroup = mapCounters.getGroup(groupName);
CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
Iterator<Counter> ctrItr =
totalGroup.iterator();
JSONArray jGroup = new JSONArray();
while (ctrItr.hasNext()) {
JSONObject jCounter = new JSONObject();
org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
String name = counter.getName();
long mapValue = mapGroup.findCounter(name).getValue();
long reduceValue = reduceGroup.findCounter(name).getValue();
long totalValue = counter.getValue();
jCounter.put("counterName", name);
jCounter.put("mapValue", mapValue);
jCounter.put("reduceValue", reduceValue);
jCounter.put("totalValue", totalValue);
jGroup.put(jCounter);
}
jGroups.put(fixGroupNameForShuffleErrors(totalGroup.getName()), jGroup);
}
json.put("counters", jGroups);
}
}
private void printTaskSummary() throws JSONException {
HistoryViewer.SummarizedJob ts = new HistoryViewer.SummarizedJob(job);
JSONObject jSums = new JSONObject();
JSONObject jSumSetup = new JSONObject();
jSumSetup.put("total", ts.totalSetups);
jSumSetup.put("successful", ts.numFinishedSetups);
jSumSetup.put("failed", ts.numFailedSetups);
jSumSetup.put("killed", ts.numKilledSetups);
jSumSetup.put("startTime", ts.setupStarted);
jSumSetup.put("finishTime", ts.setupFinished);
jSums.put("setup", jSumSetup);
JSONObject jSumMap = new JSONObject();
jSumMap.put("total", ts.totalMaps);
jSumMap.put("successful", job.getFinishedMaps());
jSumMap.put("failed", ts.numFailedMaps);
jSumMap.put("killed", ts.numKilledMaps);
jSumMap.put("startTime", ts.mapStarted);
jSumMap.put("finishTime", ts.mapFinished);
jSums.put("map", jSumMap);
JSONObject jSumReduce = new JSONObject();
jSumReduce.put("total", ts.totalReduces);
jSumReduce.put("successful", job.getFinishedReduces());
jSumReduce.put("failed", ts.numFailedReduces);
jSumReduce.put("killed", ts.numKilledReduces);
jSumReduce.put("startTime", ts.reduceStarted);
jSumReduce.put("finishTime", ts.reduceFinished);
jSums.put("reduce", jSumReduce);
JSONObject jSumCleanup = new JSONObject();
jSumCleanup.put("total", ts.totalCleanups);
jSumCleanup.put("successful", ts.numFinishedCleanups);
jSumCleanup.put("failed", ts.numFailedCleanups);
jSumCleanup.put("killed", ts.numKilledCleanups);
jSumCleanup.put("startTime", ts.cleanupStarted);
jSumCleanup.put("finishTime", ts.cleanupFinished);
jSums.put("cleanup", jSumCleanup);
json.put("taskSummary", jSums);
}
private void printTasks() throws JSONException {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
JSONArray jTasks = new JSONArray();
for (JobHistoryParser.TaskInfo task : tasks.values()) {
if (!task.getTaskType().equals(TaskType.TASK_CLEANUP) &&
((printAll && task.getTaskStatus().equals(
TaskStatus.State.SUCCEEDED.toString()))
|| task.getTaskStatus().equals(TaskStatus.State.KILLED.toString())
|| task.getTaskStatus().equals(TaskStatus.State.FAILED.toString()))) {
JSONObject jTask = new JSONObject();
jTask.put("taskId", task.getTaskId().toString());
jTask.put("type", task.getTaskType().toString());
jTask.put("status", task.getTaskStatus());
jTask.put("startTime", task.getStartTime());
jTask.put("finishTime", task.getFinishTime());
if (!task.getError().isEmpty()) {
jTask.put("error", task.getError());
}
if (task.getTaskType().equals(TaskType.MAP)) {
jTask.put("inputSplits", task.getSplitLocations());
}
if (printAll) {
printTaskCounters(jTask, task.getCounters());
JSONObject jAtt = new JSONObject();
for (JobHistoryParser.TaskAttemptInfo attempt :
task.getAllTaskAttempts().values()) {
jAtt.put("attemptId", attempt.getAttemptId());
jAtt.put("startTime", attempt.getStartTime());
if (task.getTaskType().equals(TaskType.REDUCE)) {
jAtt.put("shuffleFinished", attempt.getShuffleFinishTime());
jAtt.put("sortFinished", attempt.getSortFinishTime());
}
jAtt.put("finishTime", attempt.getFinishTime());
jAtt.put("hostName", attempt.getHostname());
if (!attempt.getError().isEmpty()) {
jAtt.put("error", task.getError());
}
String taskLogsUrl = HistoryViewer.getTaskLogsUrl(scheme, attempt);
if (taskLogsUrl != null) {
jAtt.put("taskLogs", taskLogsUrl);
}
}
jTask.put("attempts", jAtt);
}
jTasks.put(jTask);
}
json.put("tasks", jTasks);
}
}
private void printTaskCounters(JSONObject jTask, Counters taskCounters)
throws JSONException {
// Killed tasks might not have counters
if (taskCounters != null) {
JSONObject jGroups = new JSONObject();
for (String groupName : taskCounters.getGroupNames()) {
CounterGroup group = taskCounters.getGroup(groupName);
Iterator<Counter> ctrItr = group.iterator();
JSONArray jGroup = new JSONArray();
while (ctrItr.hasNext()) {
JSONObject jCounter = new JSONObject();
org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
jCounter.put("counterName", counter.getName());
jCounter.put("value", counter.getValue());
jGroup.put(jCounter);
}
jGroups.put(fixGroupNameForShuffleErrors(group.getName()), jGroup);
}
jTask.put("counters", jGroups);
}
}
private String fixGroupNameForShuffleErrors(String name) {
String retName = name;
if (name.equals("Shuffle Errors")) {
retName = "org.apache.hadoop.mapreduce.task.reduce.Fetcher.ShuffleErrors";
}
return retName;
}
}

View File

@ -17,8 +17,12 @@
*/
package org.apache.hadoop.mapreduce.tools;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
@ -92,6 +96,8 @@ public class CLI extends Configured implements Tool {
String jobid = null;
String taskid = null;
String historyFile = null;
String historyOutFile = null;
String historyOutFormat = HistoryViewer.HUMAN_FORMAT;
String counterGroupName = null;
String counterName = null;
JobPriority jp = null;
@ -173,16 +179,51 @@ public class CLI extends Configured implements Tool {
nEvents = Integer.parseInt(argv[3]);
listEvents = true;
} else if ("-history".equals(cmd)) {
if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
displayUsage(cmd);
return exitCode;
}
viewHistory = true;
if (argv.length == 3 && "all".equals(argv[1])) {
if (argv.length < 2 || argv.length > 7) {
displayUsage(cmd);
return exitCode;
}
// Some arguments are optional while others are not, and some require
// second arguments. Due to this, the indexing can vary depending on
// what's specified and what's left out, as summarized in the below table:
// [all] <jobHistoryFile> [-outfile <file>] [-format <human|json>]
// 1 2 3 4 5 6
// 1 2 3 4
// 1 2 3 4
// 1 2
// 1 2 3 4 5
// 1 2 3
// 1 2 3
// 1
// "all" is optional, but comes first if specified
int index = 1;
if ("all".equals(argv[index])) {
index++;
viewAllHistory = true;
historyFile = argv[2];
} else {
historyFile = argv[1];
if (argv.length == 2) {
displayUsage(cmd);
return exitCode;
}
}
// Get the job history file argument
historyFile = argv[index++];
// "-outfile" is optional, but if specified requires a second argument
if (argv.length > index + 1 && "-outfile".equals(argv[index])) {
index++;
historyOutFile = argv[index++];
}
// "-format" is optional, but if specified required a second argument
if (argv.length > index + 1 && "-format".equals(argv[index])) {
index++;
historyOutFormat = argv[index++];
}
// Check for any extra arguments that don't belong here
if (argv.length > index) {
displayUsage(cmd);
return exitCode;
}
} else if ("-list".equals(cmd)) {
if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
@ -338,7 +379,8 @@ public class CLI extends Configured implements Tool {
exitCode = 0;
}
} else if (viewHistory) {
viewHistory(historyFile, viewAllHistory);
viewHistory(historyFile, viewAllHistory, historyOutFile,
historyOutFormat);
exitCode = 0;
} else if (listEvents) {
listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
@ -451,7 +493,8 @@ public class CLI extends Configured implements Tool {
System.err.println(prefix + "[" + cmd +
" <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
} else if ("-history".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
System.err.println(prefix + "[" + cmd +
" [all] <jobHistoryFile> [-outfile <file>] [-format <human|json>]]");
} else if ("-list".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " [all]]");
} else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
@ -484,7 +527,8 @@ public class CLI extends Configured implements Tool {
"Valid values for priorities are: " + jobPriorityValues +
". In addition to this, integers also can be used." + "%n");
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
System.err.printf("\t[-history <jobHistoryFile>]%n");
System.err.printf("\t[-history [all] <jobHistoryFile> [-outfile <file>]" +
" [-format <human|json>]]%n");
System.err.printf("\t[-list [all]]%n");
System.err.printf("\t[-list-active-trackers]%n");
System.err.printf("\t[-list-blacklisted-trackers]%n");
@ -499,11 +543,16 @@ public class CLI extends Configured implements Tool {
}
}
private void viewHistory(String historyFile, boolean all)
throws IOException {
private void viewHistory(String historyFile, boolean all,
String historyOutFile, String format) throws IOException {
HistoryViewer historyViewer = new HistoryViewer(historyFile,
getConf(), all);
historyViewer.print();
getConf(), all, format);
PrintStream ps = System.out;
if (historyOutFile != null) {
ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(
new File(historyOutFile))), true, "UTF-8");
}
historyViewer.print(ps);
}
protected long getCounter(Counters counters, String counterGroupName,

View File

@ -81,7 +81,7 @@ Copy file or directories recursively. More information can be found at
Command to interact with Map Reduce Jobs.
Usage: `mapred job | [GENERIC_OPTIONS] | [-submit <job-file>] | [-status <job-id>] | [-counter <job-id> <group-name> <counter-name>] | [-kill <job-id>] | [-events <job-id> <from-event-#> <#-of-events>] | [-history [all] <jobOutputDir>] | [-list [all]] | [-kill-task <task-id>] | [-fail-task <task-id>] | [-set-priority <job-id> <priority>] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids <job-id> <task-type> <task-state>] [-logs <job-id> <task-attempt-id>]`
Usage: `mapred job | [GENERIC_OPTIONS] | [-submit <job-file>] | [-status <job-id>] | [-counter <job-id> <group-name> <counter-name>] | [-kill <job-id>] | [-events <job-id> <from-event-#> <#-of-events>] | [-history [all] <jobHistoryFile> [-outfile <file>] [-format <human|json>]] | [-list [all]] | [-kill-task <task-id>] | [-fail-task <task-id>] | [-set-priority <job-id> <priority>] | [-list-active-trackers] | [-list-blacklisted-trackers] | [-list-attempt-ids <job-id> <task-type> <task-state>] [-logs <job-id> <task-attempt-id>]`
| COMMAND\_OPTION | Description |
|:---- |:---- |
@ -90,7 +90,7 @@ Usage: `mapred job | [GENERIC_OPTIONS] | [-submit <job-file>] | [-status <job-id
| -counter *job-id* *group-name* *counter-name* | Prints the counter value. |
| -kill *job-id* | Kills the job. |
| -events *job-id* *from-event-\#* *\#-of-events* | Prints the events' details received by jobtracker for the given range. |
| -history [all]*jobOutputDir* | Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option. |
| -history [all] *jobHistoryFile* [-outfile *file*] [-format *human|json*] | Prints job details, failed and killed task details. More details about the job such as successful tasks, task attempts made for each task, task counters, etc can be viewed by specifying the [all] option. An optional file output path (instead of stdout) can be specified. The format defaults to human-readable but can also be changed to JSON with the [-format] option. |
| -list [all] | Displays jobs which are yet to complete. `-list all` displays all jobs. |
| -kill-task *task-id* | Kills the task. Killed tasks are NOT counted against failed attempts. |
| -fail-task *task-id* | Fails the task. Failed tasks are counted against failed attempts. |

View File

@ -0,0 +1,945 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Assert;
import org.junit.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.TimeZone;
public class TestHistoryViewerPrinter {
private static final Log LOG = LogFactory.getLog(
TestHistoryViewerPrinter.class);
@Test
public void testHumanPrinter() throws Exception {
JobHistoryParser.JobInfo job = createJobInfo();
HumanReadableHistoryViewerPrinter printer =
new HumanReadableHistoryViewerPrinter(job, false, "http://",
TimeZone.getTimeZone("GMT"));
String outStr = run(printer);
Assert.assertEquals("\n" +
"Hadoop job: job_1317928501754_0001\n" +
"=====================================\n" +
"User: rkanter\n" +
"JobName: my job\n" +
"JobConf: /tmp/job.xml\n" +
"Submitted At: 6-Oct-2011 19:15:01\n" +
"Launched At: 6-Oct-2011 19:15:02 (1sec)\n" +
"Finished At: 6-Oct-2011 19:15:16 (14sec)\n" +
"Status: SUCCEEDED\n" +
"Counters: \n" +
"\n" +
"|Group Name |Counter name |Map Value |Reduce Value|Total Value|\n" +
"---------------------------------------------------------------------------------------\n" +
"|group1 |counter1 |5 |5 |5 \n" +
"|group1 |counter2 |10 |10 |10 \n" +
"|group2 |counter1 |15 |15 |15 \n" +
"\n" +
"=====================================\n" +
"\n" +
"Task Summary\n" +
"============================\n" +
"Kind\tTotal\tSuccessful\tFailed\tKilled\tStartTime\tFinishTime\n" +
"\n" +
"Setup\t1\t1\t\t0\t0\t6-Oct-2011 19:15:03\t6-Oct-2011 19:15:04 (1sec)\n" +
"Map\t6\t5\t\t1\t0\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:16 (12sec)\n" +
"Reduce\t1\t1\t\t0\t0\t6-Oct-2011 19:15:10\t6-Oct-2011 19:15:18 (8sec)\n" +
"Cleanup\t1\t1\t\t0\t0\t6-Oct-2011 19:15:11\t6-Oct-2011 19:15:20 (9sec)\n" +
"============================\n" +
"\n" +
"\n" +
"Analysis\n" +
"=========\n" +
"\n" +
"Time taken by best performing map task task_1317928501754_0001_m_000003: 3sec\n" +
"Average time taken by map tasks: 5sec\n" +
"Worse performing map tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_m_000007 7sec\n" +
"task_1317928501754_0001_m_000006 6sec\n" +
"task_1317928501754_0001_m_000005 5sec\n" +
"task_1317928501754_0001_m_000004 4sec\n" +
"task_1317928501754_0001_m_000003 3sec\n" +
"The last map task task_1317928501754_0001_m_000007 finished at (relative to the Job launch time): 6-Oct-2011 19:15:16 (14sec)\n" +
"\n" +
"Time taken by best performing shuffle task task_1317928501754_0001_r_000008: 8sec\n" +
"Average time taken by shuffle tasks: 8sec\n" +
"Worse performing shuffle tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_r_000008 8sec\n" +
"The last shuffle task task_1317928501754_0001_r_000008 finished at (relative to the Job launch time): 6-Oct-2011 19:15:18 (16sec)\n" +
"\n" +
"Time taken by best performing reduce task task_1317928501754_0001_r_000008: 0sec\n" +
"Average time taken by reduce tasks: 0sec\n" +
"Worse performing reduce tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_r_000008 0sec\n" +
"The last reduce task task_1317928501754_0001_r_000008 finished at (relative to the Job launch time): 6-Oct-2011 19:15:18 (16sec)\n" +
"=========\n" +
"\n" +
"FAILED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000002\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:06 (2sec)\t\t\n" +
"\n" +
"FAILED task attempts by nodes\n" +
"Hostname\tFailedTasks\n" +
"===============================\n" +
"localhost\ttask_1317928501754_0001_m_000002, \n", outStr);
}
@Test
public void testHumanPrinterAll() throws Exception {
JobHistoryParser.JobInfo job = createJobInfo();
HumanReadableHistoryViewerPrinter printer =
new HumanReadableHistoryViewerPrinter(job, true, "http://",
TimeZone.getTimeZone("GMT"));
String outStr = run(printer);
if (System.getProperty("java.version").startsWith("1.7")) {
Assert.assertEquals("\n" +
"Hadoop job: job_1317928501754_0001\n" +
"=====================================\n" +
"User: rkanter\n" +
"JobName: my job\n" +
"JobConf: /tmp/job.xml\n" +
"Submitted At: 6-Oct-2011 19:15:01\n" +
"Launched At: 6-Oct-2011 19:15:02 (1sec)\n" +
"Finished At: 6-Oct-2011 19:15:16 (14sec)\n" +
"Status: SUCCEEDED\n" +
"Counters: \n" +
"\n" +
"|Group Name |Counter name |Map Value |Reduce Value|Total Value|\n" +
"---------------------------------------------------------------------------------------\n" +
"|group1 |counter1 |5 |5 |5 \n" +
"|group1 |counter2 |10 |10 |10 \n" +
"|group2 |counter1 |15 |15 |15 \n" +
"\n" +
"=====================================\n" +
"\n" +
"Task Summary\n" +
"============================\n" +
"Kind\tTotal\tSuccessful\tFailed\tKilled\tStartTime\tFinishTime\n" +
"\n" +
"Setup\t1\t1\t\t0\t0\t6-Oct-2011 19:15:03\t6-Oct-2011 19:15:04 (1sec)\n" +
"Map\t6\t5\t\t1\t0\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:16 (12sec)\n" +
"Reduce\t1\t1\t\t0\t0\t6-Oct-2011 19:15:10\t6-Oct-2011 19:15:18 (8sec)\n" +
"Cleanup\t1\t1\t\t0\t0\t6-Oct-2011 19:15:11\t6-Oct-2011 19:15:20 (9sec)\n" +
"============================\n" +
"\n" +
"\n" +
"Analysis\n" +
"=========\n" +
"\n" +
"Time taken by best performing map task task_1317928501754_0001_m_000003: 3sec\n" +
"Average time taken by map tasks: 5sec\n" +
"Worse performing map tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_m_000007 7sec\n" +
"task_1317928501754_0001_m_000006 6sec\n" +
"task_1317928501754_0001_m_000005 5sec\n" +
"task_1317928501754_0001_m_000004 4sec\n" +
"task_1317928501754_0001_m_000003 3sec\n" +
"The last map task task_1317928501754_0001_m_000007 finished at (relative to the Job launch time): 6-Oct-2011 19:15:16 (14sec)\n" +
"\n" +
"Time taken by best performing shuffle task task_1317928501754_0001_r_000008: 8sec\n" +
"Average time taken by shuffle tasks: 8sec\n" +
"Worse performing shuffle tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_r_000008 8sec\n" +
"The last shuffle task task_1317928501754_0001_r_000008 finished at (relative to the Job launch time): 6-Oct-2011 19:15:18 (16sec)\n" +
"\n" +
"Time taken by best performing reduce task task_1317928501754_0001_r_000008: 0sec\n" +
"Average time taken by reduce tasks: 0sec\n" +
"Worse performing reduce tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_r_000008 0sec\n" +
"The last reduce task task_1317928501754_0001_r_000008 finished at (relative to the Job launch time): 6-Oct-2011 19:15:18 (16sec)\n" +
"=========\n" +
"\n" +
"FAILED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000002\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:06 (2sec)\t\t\n" +
"\n" +
"SUCCEEDED JOB_SETUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\n" +
"====================================================\n" +
"task_1317928501754_0001_s_000001\t6-Oct-2011 19:15:03\t6-Oct-2011 19:15:04 (1sec)\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000006\t6-Oct-2011 19:15:08\t6-Oct-2011 19:15:14 (6sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000005\t6-Oct-2011 19:15:07\t6-Oct-2011 19:15:12 (5sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000004\t6-Oct-2011 19:15:06\t6-Oct-2011 19:15:10 (4sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000003\t6-Oct-2011 19:15:05\t6-Oct-2011 19:15:08 (3sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000007\t6-Oct-2011 19:15:09\t6-Oct-2011 19:15:16 (7sec)\t\t\n" +
"\n" +
"SUCCEEDED REDUCE task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\n" +
"====================================================\n" +
"task_1317928501754_0001_r_000008\t6-Oct-2011 19:15:10\t6-Oct-2011 19:15:18 (8sec)\t\n" +
"\n" +
"SUCCEEDED JOB_CLEANUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\n" +
"====================================================\n" +
"task_1317928501754_0001_c_000009\t6-Oct-2011 19:15:11\t6-Oct-2011 19:15:20 (9sec)\t\n" +
"\n" +
"JOB_SETUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_s_000001_1\t6-Oct-2011 19:15:03\t6-Oct-2011 19:15:04 (1sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_s_000001_1\n" +
"\n" +
"MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_m_000002_1\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:06 (2sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000002_1\n" +
"attempt_1317928501754_0001_m_000006_1\t6-Oct-2011 19:15:08\t6-Oct-2011 19:15:14 (6sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000006_1\n" +
"attempt_1317928501754_0001_m_000005_1\t6-Oct-2011 19:15:07\t6-Oct-2011 19:15:12 (5sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000005_1\n" +
"attempt_1317928501754_0001_m_000004_1\t6-Oct-2011 19:15:06\t6-Oct-2011 19:15:10 (4sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000004_1\n" +
"attempt_1317928501754_0001_m_000003_1\t6-Oct-2011 19:15:05\t6-Oct-2011 19:15:08 (3sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000003_1\n" +
"attempt_1317928501754_0001_m_000007_1\t6-Oct-2011 19:15:09\t6-Oct-2011 19:15:16 (7sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000007_1\n" +
"\n" +
"REDUCE task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tShuffleFinished\tSortFinished\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_r_000008_1\t6-Oct-2011 19:15:10\t6-Oct-2011 19:15:18 (8sec)\t6-Oct-2011 19:15:18 (0sec)6-Oct-2011 19:15:18 (8sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_r_000008_1\n" +
"\n" +
"JOB_CLEANUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_c_000009_1\t6-Oct-2011 19:15:11\t6-Oct-2011 19:15:20 (9sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_c_000009_1\n" +
"\n" +
"FAILED task attempts by nodes\n" +
"Hostname\tFailedTasks\n" +
"===============================\n" +
"localhost\ttask_1317928501754_0001_m_000002, \n", outStr);
} else {
Assert.assertEquals("\n" +
"Hadoop job: job_1317928501754_0001\n" +
"=====================================\n" +
"User: rkanter\n" +
"JobName: my job\n" +
"JobConf: /tmp/job.xml\n" +
"Submitted At: 6-Oct-2011 19:15:01\n" +
"Launched At: 6-Oct-2011 19:15:02 (1sec)\n" +
"Finished At: 6-Oct-2011 19:15:16 (14sec)\n" +
"Status: SUCCEEDED\n" +
"Counters: \n" +
"\n" +
"|Group Name |Counter name |Map Value |Reduce Value|Total Value|\n" +
"---------------------------------------------------------------------------------------\n" +
"|group1 |counter1 |5 |5 |5 \n" +
"|group1 |counter2 |10 |10 |10 \n" +
"|group2 |counter1 |15 |15 |15 \n" +
"\n" +
"=====================================\n" +
"\n" +
"Task Summary\n" +
"============================\n" +
"Kind\tTotal\tSuccessful\tFailed\tKilled\tStartTime\tFinishTime\n" +
"\n" +
"Setup\t1\t1\t\t0\t0\t6-Oct-2011 19:15:03\t6-Oct-2011 19:15:04 (1sec)\n" +
"Map\t6\t5\t\t1\t0\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:16 (12sec)\n" +
"Reduce\t1\t1\t\t0\t0\t6-Oct-2011 19:15:10\t6-Oct-2011 19:15:18 (8sec)\n" +
"Cleanup\t1\t1\t\t0\t0\t6-Oct-2011 19:15:11\t6-Oct-2011 19:15:20 (9sec)\n" +
"============================\n" +
"\n" +
"\n" +
"Analysis\n" +
"=========\n" +
"\n" +
"Time taken by best performing map task task_1317928501754_0001_m_000003: 3sec\n" +
"Average time taken by map tasks: 5sec\n" +
"Worse performing map tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_m_000007 7sec\n" +
"task_1317928501754_0001_m_000006 6sec\n" +
"task_1317928501754_0001_m_000005 5sec\n" +
"task_1317928501754_0001_m_000004 4sec\n" +
"task_1317928501754_0001_m_000003 3sec\n" +
"The last map task task_1317928501754_0001_m_000007 finished at (relative to the Job launch time): 6-Oct-2011 19:15:16 (14sec)\n" +
"\n" +
"Time taken by best performing shuffle task task_1317928501754_0001_r_000008: 8sec\n" +
"Average time taken by shuffle tasks: 8sec\n" +
"Worse performing shuffle tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_r_000008 8sec\n" +
"The last shuffle task task_1317928501754_0001_r_000008 finished at (relative to the Job launch time): 6-Oct-2011 19:15:18 (16sec)\n" +
"\n" +
"Time taken by best performing reduce task task_1317928501754_0001_r_000008: 0sec\n" +
"Average time taken by reduce tasks: 0sec\n" +
"Worse performing reduce tasks: \n" +
"TaskId\t\tTimetaken\n" +
"task_1317928501754_0001_r_000008 0sec\n" +
"The last reduce task task_1317928501754_0001_r_000008 finished at (relative to the Job launch time): 6-Oct-2011 19:15:18 (16sec)\n" +
"=========\n" +
"\n" +
"FAILED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000002\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:06 (2sec)\t\t\n" +
"\n" +
"SUCCEEDED JOB_SETUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\n" +
"====================================================\n" +
"task_1317928501754_0001_s_000001\t6-Oct-2011 19:15:03\t6-Oct-2011 19:15:04 (1sec)\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000007\t6-Oct-2011 19:15:09\t6-Oct-2011 19:15:16 (7sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000006\t6-Oct-2011 19:15:08\t6-Oct-2011 19:15:14 (6sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000005\t6-Oct-2011 19:15:07\t6-Oct-2011 19:15:12 (5sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000004\t6-Oct-2011 19:15:06\t6-Oct-2011 19:15:10 (4sec)\t\t\n" +
"\n" +
"SUCCEEDED MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\tInputSplits\n" +
"====================================================\n" +
"task_1317928501754_0001_m_000003\t6-Oct-2011 19:15:05\t6-Oct-2011 19:15:08 (3sec)\t\t\n" +
"\n" +
"SUCCEEDED REDUCE task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\n" +
"====================================================\n" +
"task_1317928501754_0001_r_000008\t6-Oct-2011 19:15:10\t6-Oct-2011 19:15:18 (8sec)\t\n" +
"\n" +
"SUCCEEDED JOB_CLEANUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tError\n" +
"====================================================\n" +
"task_1317928501754_0001_c_000009\t6-Oct-2011 19:15:11\t6-Oct-2011 19:15:20 (9sec)\t\n" +
"\n" +
"JOB_SETUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_s_000001_1\t6-Oct-2011 19:15:03\t6-Oct-2011 19:15:04 (1sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_s_000001_1\n" +
"\n" +
"MAP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_m_000007_1\t6-Oct-2011 19:15:09\t6-Oct-2011 19:15:16 (7sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000007_1\n" +
"attempt_1317928501754_0001_m_000002_1\t6-Oct-2011 19:15:04\t6-Oct-2011 19:15:06 (2sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000002_1\n" +
"attempt_1317928501754_0001_m_000006_1\t6-Oct-2011 19:15:08\t6-Oct-2011 19:15:14 (6sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000006_1\n" +
"attempt_1317928501754_0001_m_000005_1\t6-Oct-2011 19:15:07\t6-Oct-2011 19:15:12 (5sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000005_1\n" +
"attempt_1317928501754_0001_m_000004_1\t6-Oct-2011 19:15:06\t6-Oct-2011 19:15:10 (4sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000004_1\n" +
"attempt_1317928501754_0001_m_000003_1\t6-Oct-2011 19:15:05\t6-Oct-2011 19:15:08 (3sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000003_1\n" +
"\n" +
"REDUCE task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tShuffleFinished\tSortFinished\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_r_000008_1\t6-Oct-2011 19:15:10\t6-Oct-2011 19:15:18 (8sec)\t6-Oct-2011 19:15:18 (0sec)6-Oct-2011 19:15:18 (8sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_r_000008_1\n" +
"\n" +
"JOB_CLEANUP task list for job_1317928501754_0001\n" +
"TaskId\t\tStartTime\tFinishTime\tHostName\tError\tTaskLogs\n" +
"====================================================\n" +
"attempt_1317928501754_0001_c_000009_1\t6-Oct-2011 19:15:11\t6-Oct-2011 19:15:20 (9sec)\tlocalhost\thttp://t:1234/tasklog?attemptid=attempt_1317928501754_0001_c_000009_1\n" +
"\n" +
"FAILED task attempts by nodes\n" +
"Hostname\tFailedTasks\n" +
"===============================\n" +
"localhost\ttask_1317928501754_0001_m_000002, \n", outStr);
}
}
@Test
public void testJSONPrinter() throws Exception {
JobHistoryParser.JobInfo job = createJobInfo();
JSONHistoryViewerPrinter printer =
new JSONHistoryViewerPrinter(job, false, "http://");
String outStr = run(printer);
JSONAssert.assertEquals("{\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"mapValue\": 5,\n" +
" \"reduceValue\": 5,\n" +
" \"totalValue\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"mapValue\": 10,\n" +
" \"reduceValue\": 10,\n" +
" \"totalValue\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"mapValue\": 15,\n" +
" \"reduceValue\": 15,\n" +
" \"totalValue\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishedAt\": 1317928516754,\n" +
" \"hadoopJob\": \"job_1317928501754_0001\",\n" +
" \"jobConf\": \"/tmp/job.xml\",\n" +
" \"jobName\": \"my job\",\n" +
" \"launchedAt\": 1317928502754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"submittedAt\": 1317928501754,\n" +
" \"taskSummary\": {\n" +
" \"cleanup\": {\n" +
" \"failed\": 0,\n" +
" \"finishTime\": 1317928520754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928511754,\n" +
" \"successful\": 1,\n" +
" \"total\": 1\n" +
" },\n" +
" \"map\": {\n" +
" \"failed\": 1,\n" +
" \"finishTime\": 1317928516754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928504754,\n" +
" \"successful\": 5,\n" +
" \"total\": 6\n" +
" },\n" +
" \"reduce\": {\n" +
" \"failed\": 0,\n" +
" \"finishTime\": 1317928518754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928510754,\n" +
" \"successful\": 1,\n" +
" \"total\": 1\n" +
" },\n" +
" \"setup\": {\n" +
" \"failed\": 0,\n" +
" \"finishTime\": 1317928504754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928503754,\n" +
" \"successful\": 1,\n" +
" \"total\": 1\n" +
" }\n" +
" },\n" +
" \"tasks\": [\n" +
" {\n" +
" \"finishTime\": 1317928506754,\n" +
" \"inputSplits\": \"\",\n" +
" \"startTime\": 1317928504754,\n" +
" \"status\": \"FAILED\",\n" +
" \"taskId\": \"task_1317928501754_0001_m_000002\",\n" +
" \"type\": \"MAP\"\n" +
" }\n" +
" ],\n" +
" \"user\": \"rkanter\"\n" +
"}\n", outStr, JSONCompareMode.NON_EXTENSIBLE);
}
@Test
public void testJSONPrinterAll() throws Exception {
JobHistoryParser.JobInfo job = createJobInfo();
JSONHistoryViewerPrinter printer =
new JSONHistoryViewerPrinter(job, true, "http://");
String outStr = run(printer);
JSONAssert.assertEquals("{\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"mapValue\": 5,\n" +
" \"reduceValue\": 5,\n" +
" \"totalValue\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"mapValue\": 10,\n" +
" \"reduceValue\": 10,\n" +
" \"totalValue\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"mapValue\": 15,\n" +
" \"reduceValue\": 15,\n" +
" \"totalValue\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishedAt\": 1317928516754,\n" +
" \"hadoopJob\": \"job_1317928501754_0001\",\n" +
" \"jobConf\": \"/tmp/job.xml\",\n" +
" \"jobName\": \"my job\",\n" +
" \"launchedAt\": 1317928502754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"submittedAt\": 1317928501754,\n" +
" \"taskSummary\": {\n" +
" \"cleanup\": {\n" +
" \"failed\": 0,\n" +
" \"finishTime\": 1317928520754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928511754,\n" +
" \"successful\": 1,\n" +
" \"total\": 1\n" +
" },\n" +
" \"map\": {\n" +
" \"failed\": 1,\n" +
" \"finishTime\": 1317928516754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928504754,\n" +
" \"successful\": 5,\n" +
" \"total\": 6\n" +
" },\n" +
" \"reduce\": {\n" +
" \"failed\": 0,\n" +
" \"finishTime\": 1317928518754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928510754,\n" +
" \"successful\": 1,\n" +
" \"total\": 1\n" +
" },\n" +
" \"setup\": {\n" +
" \"failed\": 0,\n" +
" \"finishTime\": 1317928504754,\n" +
" \"killed\": 0,\n" +
" \"startTime\": 1317928503754,\n" +
" \"successful\": 1,\n" +
" \"total\": 1\n" +
" }\n" +
" },\n" +
" \"tasks\": [\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_m_000002_1\",\n" +
" \"finishTime\": 1317928506754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928504754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000002_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928506754,\n" +
" \"inputSplits\": \"\",\n" +
" \"startTime\": 1317928504754,\n" +
" \"status\": \"FAILED\",\n" +
" \"taskId\": \"task_1317928501754_0001_m_000002\",\n" +
" \"type\": \"MAP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_s_000001_1\",\n" +
" \"finishTime\": 1317928504754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928503754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_s_000001_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928504754,\n" +
" \"startTime\": 1317928503754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_s_000001\",\n" +
" \"type\": \"JOB_SETUP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_m_000006_1\",\n" +
" \"finishTime\": 1317928514754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928508754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000006_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928514754,\n" +
" \"inputSplits\": \"\",\n" +
" \"startTime\": 1317928508754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_m_000006\",\n" +
" \"type\": \"MAP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_m_000005_1\",\n" +
" \"finishTime\": 1317928512754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928507754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000005_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928512754,\n" +
" \"inputSplits\": \"\",\n" +
" \"startTime\": 1317928507754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_m_000005\",\n" +
" \"type\": \"MAP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_m_000004_1\",\n" +
" \"finishTime\": 1317928510754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928506754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000004_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928510754,\n" +
" \"inputSplits\": \"\",\n" +
" \"startTime\": 1317928506754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_m_000004\",\n" +
" \"type\": \"MAP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_m_000003_1\",\n" +
" \"finishTime\": 1317928508754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928505754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000003_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928508754,\n" +
" \"inputSplits\": \"\",\n" +
" \"startTime\": 1317928505754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_m_000003\",\n" +
" \"type\": \"MAP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_c_000009_1\",\n" +
" \"finishTime\": 1317928520754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928511754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_c_000009_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928520754,\n" +
" \"startTime\": 1317928511754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_c_000009\",\n" +
" \"type\": \"JOB_CLEANUP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_m_000007_1\",\n" +
" \"finishTime\": 1317928516754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"startTime\": 1317928509754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_m_000007_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928516754,\n" +
" \"inputSplits\": \"\",\n" +
" \"startTime\": 1317928509754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_m_000007\",\n" +
" \"type\": \"MAP\"\n" +
" },\n" +
" {\n" +
" \"attempts\": {\n" +
" \"attemptId\": \"attempt_1317928501754_0001_r_000008_1\",\n" +
" \"finishTime\": 1317928518754,\n" +
" \"hostName\": \"localhost\",\n" +
" \"shuffleFinished\": 1317928518754,\n" +
" \"sortFinished\": 1317928518754,\n" +
" \"startTime\": 1317928510754,\n" +
" \"taskLogs\": \"http://t:1234/tasklog?attemptid=attempt_1317928501754_0001_r_000008_1\"\n" +
" },\n" +
" \"counters\": {\n" +
" \"group1\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 5\n" +
" },\n" +
" {\n" +
" \"counterName\": \"counter2\",\n" +
" \"value\": 10\n" +
" }\n" +
" ],\n" +
" \"group2\": [\n" +
" {\n" +
" \"counterName\": \"counter1\",\n" +
" \"value\": 15\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"finishTime\": 1317928518754,\n" +
" \"startTime\": 1317928510754,\n" +
" \"status\": \"SUCCEEDED\",\n" +
" \"taskId\": \"task_1317928501754_0001_r_000008\",\n" +
" \"type\": \"REDUCE\"\n" +
" }\n" +
" ],\n" +
" \"user\": \"rkanter\"\n" +
"}\n", outStr, JSONCompareMode.NON_EXTENSIBLE);
}
private String run(HistoryViewerPrinter printer) throws Exception {
ByteArrayOutputStream boas = new ByteArrayOutputStream();
PrintStream out = new PrintStream(boas, true);
printer.print(out);
out.close();
String outStr = boas.toString("UTF-8");
LOG.info("out = " + outStr);
return outStr;
}
private static JobHistoryParser.JobInfo createJobInfo() {
JobHistoryParser.JobInfo job = new JobHistoryParser.JobInfo();
job.submitTime = 1317928501754L;
job.finishTime = job.submitTime + 15000;
job.jobid = JobID.forName("job_1317928501754_0001");
job.username = "rkanter";
job.jobname = "my job";
job.jobQueueName = "my queue";
job.jobConfPath = "/tmp/job.xml";
job.launchTime = job.submitTime + 1000;
job.totalMaps = 5;
job.totalReduces = 1;
job.failedMaps = 1;
job.failedReduces = 0;
job.finishedMaps = 5;
job.finishedReduces = 1;
job.jobStatus = JobStatus.State.SUCCEEDED.name();
job.totalCounters = createCounters();
job.mapCounters = createCounters();
job.reduceCounters = createCounters();
job.tasksMap = new HashMap<>();
addTaskInfo(job, TaskType.JOB_SETUP, 1, TaskStatus.State.SUCCEEDED);
addTaskInfo(job, TaskType.MAP, 2, TaskStatus.State.FAILED);
addTaskInfo(job, TaskType.MAP, 3, TaskStatus.State.SUCCEEDED);
addTaskInfo(job, TaskType.MAP, 4, TaskStatus.State.SUCCEEDED);
addTaskInfo(job, TaskType.MAP, 5, TaskStatus.State.SUCCEEDED);
addTaskInfo(job, TaskType.MAP, 6, TaskStatus.State.SUCCEEDED);
addTaskInfo(job, TaskType.MAP, 7, TaskStatus.State.SUCCEEDED);
addTaskInfo(job, TaskType.REDUCE, 8, TaskStatus.State.SUCCEEDED);
addTaskInfo(job, TaskType.JOB_CLEANUP, 9, TaskStatus.State.SUCCEEDED);
return job;
}
private static Counters createCounters() {
Counters counters = new Counters();
counters.findCounter("group1", "counter1").setValue(5);
counters.findCounter("group1", "counter2").setValue(10);
counters.findCounter("group2", "counter1").setValue(15);
return counters;
}
private static void addTaskInfo(JobHistoryParser.JobInfo job,
TaskType type, int id, TaskStatus.State status) {
JobHistoryParser.TaskInfo task = new JobHistoryParser.TaskInfo();
task.taskId = new TaskID(job.getJobId(), type, id);
task.startTime = job.getLaunchTime() + id * 1000;
task.finishTime = task.startTime + id * 1000;
task.taskType = type;
task.counters = createCounters();
task.status = status.name();
task.attemptsMap = new HashMap<>();
addTaskAttemptInfo(task, 1);
job.tasksMap.put(task.getTaskId(), task);
}
private static void addTaskAttemptInfo(
JobHistoryParser.TaskInfo task, int id) {
JobHistoryParser.TaskAttemptInfo attempt =
new JobHistoryParser.TaskAttemptInfo();
attempt.attemptId = new TaskAttemptID(
TaskID.downgrade(task.getTaskId()), id);
attempt.startTime = task.getStartTime();
attempt.finishTime = task.getFinishTime();
attempt.shuffleFinishTime = task.getFinishTime();
attempt.sortFinishTime = task.getFinishTime();
attempt.mapFinishTime = task.getFinishTime();
attempt.status = task.getTaskStatus();
attempt.taskType = task.getTaskType();
attempt.trackerName = "localhost";
attempt.httpPort = 1234;
attempt.hostname = "localhost";
task.attemptsMap.put(attempt.getAttemptId(), attempt);
}
}

View File

@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
@ -29,6 +30,8 @@ import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -360,26 +363,151 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toUri()
.toString();
// bad command
int exitCode = runTool(conf, jc, new String[] { "-history", "pul",
historyFileUri }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-history", "all",
historyFileUri }, out);
// Try a bunch of different valid combinations of the command to test
// argument parsing
int exitCode = runTool(conf, jc, new String[] {
"-history",
"all",
historyFileUri,
}, out);
assertEquals("Exit code", 0, exitCode);
String line;
checkHistoryHumanOutput(out);
File outFile = File.createTempFile("myout", ".txt");
exitCode = runTool(conf, jc, new String[] {
"-history",
"all",
historyFileUri,
"-outfile",
outFile.getAbsolutePath()
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryHumanFileOutput(out, outFile);
outFile = File.createTempFile("myout", ".txt");
exitCode = runTool(conf, jc, new String[] {
"-history",
"all",
historyFileUri,
"-outfile",
outFile.getAbsolutePath(),
"-format",
"human"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryHumanFileOutput(out, outFile);
exitCode = runTool(conf, jc, new String[] {
"-history",
historyFileUri,
"-format",
"human"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryHumanOutput(out);
exitCode = runTool(conf, jc, new String[] {
"-history",
"all",
historyFileUri,
"-format",
"json"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryJSONOutput(out);
outFile = File.createTempFile("myout", ".txt");
exitCode = runTool(conf, jc, new String[] {
"-history",
"all",
historyFileUri,
"-outfile",
outFile.getAbsolutePath(),
"-format",
"json"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryJSONFileOutput(out, outFile);
exitCode = runTool(conf, jc, new String[] {
"-history",
historyFileUri,
"-format",
"json"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryJSONOutput(out);
// Check some bad arguments
exitCode = runTool(conf, jc, new String[] {
"-history",
historyFileUri,
"foo"
}, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] {
"-history",
historyFileUri,
"-format"
}, out);
assertEquals("Exit code", -1, exitCode);
runTool(conf, jc, new String[] {
"-history",
historyFileUri,
"-outfile",
}, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[]{
"-history",
historyFileUri,
"-format",
"foo"
}, out);
fail();
} catch (IllegalArgumentException e) {
// Expected
}
}
private void checkHistoryHumanOutput(ByteArrayOutputStream out)
throws IOException, JSONException {
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.startsWith("task_")) {
counter++;
}
}
assertEquals(23, counter);
br.readLine();
String line = br.readLine();
br.close();
assertEquals("Hadoop job: job_1329348432655_0001", line);
out.reset();
}
private void checkHistoryJSONOutput(ByteArrayOutputStream out)
throws IOException, JSONException {
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
String line = org.apache.commons.io.IOUtils.toString(br);
br.close();
JSONObject json = new JSONObject(line);
assertEquals("job_1329348432655_0001", json.getString("hadoopJob"));
out.reset();
}
private void checkHistoryHumanFileOutput(ByteArrayOutputStream out,
File outFile) throws IOException, JSONException {
BufferedReader br = new BufferedReader(new FileReader(outFile));
br.readLine();
String line = br.readLine();
br.close();
assertEquals("Hadoop job: job_1329348432655_0001", line);
assertEquals(0, out.size());
}
private void checkHistoryJSONFileOutput(ByteArrayOutputStream out,
File outFile) throws IOException, JSONException {
BufferedReader br = new BufferedReader(new FileReader(outFile));
String line = org.apache.commons.io.IOUtils.toString(br);
br.close();
JSONObject json = new JSONObject(line);
assertEquals("job_1329348432655_0001", json.getString("hadoopJob"));
assertEquals(0, out.size());
}
/**
* print job events list
*/

View File

@ -992,6 +992,12 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
</dependencyManagement>