MAPREDUCE-2672. MR-279: JobHistory Server needs Analysis this job. (Robert Evans via mahadev) - Merging r1171297 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1171299 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-09-15 22:23:11 +00:00
parent 38e0f786b2
commit 66ad5a6154
20 changed files with 687 additions and 134 deletions

View File

@ -269,6 +269,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2966. Added ShutDown hooks for MRV2 processes so that they can MAPREDUCE-2966. Added ShutDown hooks for MRV2 processes so that they can
gracefully exit. (Abhijit Suresh Shingate via vinodkv) gracefully exit. (Abhijit Suresh Shingate via vinodkv)
MAPREDUCE-2672. MR-279: JobHistory Server needs Analysis this job.
(Robert Evans via mahadev)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -38,32 +38,51 @@ public interface TaskAttempt {
float getProgress(); float getProgress();
TaskAttemptState getState(); TaskAttemptState getState();
/** Has attempt reached the final state or not. /**
* Has attempt reached the final state or not.
* @return true if it has finished, else false
*/ */
boolean isFinished(); boolean isFinished();
/**If container Assigned then return container ID, otherwise null. /**
* @return the container ID if a container is assigned, otherwise null.
*/ */
ContainerId getAssignedContainerID(); ContainerId getAssignedContainerID();
/**If container Assigned then return container mgr address, otherwise null. /**
* @return container mgr address if a container is assigned, otherwise null.
*/ */
String getAssignedContainerMgrAddress(); String getAssignedContainerMgrAddress();
/**If container Assigned then return the node's http address, otherwise null. /**
* @return node's http address if a container is assigned, otherwise null.
*/ */
String getNodeHttpAddress(); String getNodeHttpAddress();
/** Returns time at which container is launched. If container is not launched /**
* @return time at which container is launched. If container is not launched
* yet, returns 0. * yet, returns 0.
*/ */
long getLaunchTime(); long getLaunchTime();
/** Returns attempt's finish time. If attempt is not finished /**
* @return attempt's finish time. If attempt is not finished
* yet, returns 0. * yet, returns 0.
*/ */
long getFinishTime(); long getFinishTime();
/**
* @return The attempt's shuffle finish time if the attempt is a reduce. If
* attempt is not finished yet, returns 0.
*/
long getShuffleFinishTime();
/**
* @return The attempt's sort or merge finish time if the attempt is a reduce.
* If attempt is not finished yet, returns 0.
*/
long getSortFinishTime();
/** /**
* @return the port shuffle is on. * @return the port shuffle is on.
*/ */

View File

@ -695,6 +695,25 @@ public abstract class TaskAttemptImpl implements
} }
} }
@Override
public long getShuffleFinishTime() {
readLock.lock();
try {
return this.reportedStatus.shuffleFinishTime;
} finally {
readLock.unlock();
}
}
@Override
public long getSortFinishTime() {
readLock.lock();
try {
return this.reportedStatus.sortFinishTime;
} finally {
readLock.unlock();
}
}
@Override @Override
public int getShufflePort() { public int getShufflePort() {
@ -751,6 +770,7 @@ public abstract class TaskAttemptImpl implements
result.setProgress(reportedStatus.progress); result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime); result.setStartTime(launchTime);
result.setFinishTime(finishTime); result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(reportedStatus.diagnosticInfo); result.setDiagnosticInfo(reportedStatus.diagnosticInfo);
result.setPhase(reportedStatus.phase); result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString); result.setStateString(reportedStatus.stateString);

View File

@ -22,6 +22,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -260,6 +261,16 @@ public class MockJobs extends MockApps {
public String getAssignedContainerMgrAddress() { public String getAssignedContainerMgrAddress() {
return "localhost:9998"; return "localhost:9998";
} }
@Override
public long getShuffleFinishTime() {
return 0;
}
@Override
public long getSortFinishTime() {
return 0;
}
}; };
} }
@ -454,7 +465,7 @@ public class MockJobs extends MockApps {
@Override @Override
public List<String> getDiagnostics() { public List<String> getDiagnostics() {
throw new UnsupportedOperationException("Not supported yet."); return Collections.<String>emptyList();
} }
@Override @Override
@ -465,7 +476,7 @@ public class MockJobs extends MockApps {
@Override @Override
public String getUserName() { public String getUserName() {
throw new UnsupportedOperationException("Not supported yet."); return "mock";
} }
@Override @Override
@ -475,7 +486,7 @@ public class MockJobs extends MockApps {
@Override @Override
public Map<JobACL, AccessControlList> getJobACLs() { public Map<JobACL, AccessControlList> getJobACLs() {
throw new UnsupportedOperationException("Not supported yet."); return Collections.<JobACL, AccessControlList>emptyMap();
} }
}; };
} }

View File

@ -693,6 +693,16 @@ public class TestRuntimeEstimators {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public long getShuffleFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getSortFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override @Override
public String getAssignedContainerMgrAddress() { public String getAssignedContainerMgrAddress() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");

View File

@ -21,12 +21,17 @@ package org.apache.hadoop.mapreduce.v2.app.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID; import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -115,18 +120,42 @@ public class TestAMWebApp {
WebAppTests.testPage(AppView.class, AppContext.class, new TestAppContext()); WebAppTests.testPage(AppView.class, AppContext.class, new TestAppContext());
} }
@Test public void testJobView() { @Test public void testJobView() {
WebAppTests.testPage(JobPage.class, AppContext.class, new TestAppContext()); AppContext appContext = new TestAppContext();
Map<String, String> params = getJobParams(appContext);
WebAppTests.testPage(JobPage.class, AppContext.class, appContext, params);
} }
@Test public void testTasksView() { @Test public void testTasksView() {
WebAppTests.testPage(TasksPage.class, AppContext.class, AppContext appContext = new TestAppContext();
new TestAppContext()); Map<String, String> params = getTaskParams(appContext);
WebAppTests.testPage(TasksPage.class, AppContext.class, appContext, params);
} }
@Test public void testTaskView() { @Test public void testTaskView() {
WebAppTests.testPage(TaskPage.class, AppContext.class, AppContext appContext = new TestAppContext();
new TestAppContext()); Map<String, String> params = getTaskParams(appContext);
WebAppTests.testPage(TaskPage.class, AppContext.class, appContext, params);
}
public static Map<String, String> getJobParams(AppContext appContext) {
JobId jobId = appContext.getAllJobs().entrySet().iterator().next().getKey();
Map<String, String> params = new HashMap<String, String>();
params.put(AMParams.JOB_ID, MRApps.toString(jobId));
return params;
}
public static Map<String, String> getTaskParams(AppContext appContext) {
JobId jobId = appContext.getAllJobs().entrySet().iterator().next().getKey();
Entry<TaskId, Task> e = appContext.getJob(jobId).getTasks().entrySet().iterator().next();
e.getValue().getType();
Map<String, String> params = new HashMap<String, String>();
params.put(AMParams.JOB_ID, MRApps.toString(jobId));
params.put(AMParams.TASK_ID, e.getKey().toString());
params.put(AMParams.TASK_TYPE, MRApps.taskSymbol(e.getValue().getType()));
return params;
} }
public static void main(String[] args) { public static void main(String[] args) {

View File

@ -24,6 +24,10 @@ public interface TaskAttemptReport {
public abstract float getProgress(); public abstract float getProgress();
public abstract long getStartTime(); public abstract long getStartTime();
public abstract long getFinishTime(); public abstract long getFinishTime();
/** @return the shuffle finish time. Applicable only for reduce attempts */
public abstract long getShuffleFinishTime();
/** @return the sort/merge finish time. Applicable only for reduce attempts */
public abstract long getSortFinishTime();
public abstract Counters getCounters(); public abstract Counters getCounters();
public abstract String getDiagnosticInfo(); public abstract String getDiagnosticInfo();
public abstract String getStateString(); public abstract String getStateString();
@ -39,4 +43,14 @@ public interface TaskAttemptReport {
public abstract void setStateString(String stateString); public abstract void setStateString(String stateString);
public abstract void setPhase(Phase phase); public abstract void setPhase(Phase phase);
/**
* Set the shuffle finish time. Applicable only for reduce attempts
* @param time the time the shuffle finished.
*/
public abstract void setShuffleFinishTime(long time);
/**
* Set the sort/merge finish time. Applicable only for reduce attempts
* @param time the time the shuffle finished.
*/
public abstract void setSortFinishTime(long time);
} }

View File

@ -127,6 +127,31 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
maybeInitBuilder(); maybeInitBuilder();
builder.setFinishTime((finishTime)); builder.setFinishTime((finishTime));
} }
@Override
public long getShuffleFinishTime() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getShuffleFinishTime());
}
@Override
public void setShuffleFinishTime(long time) {
maybeInitBuilder();
builder.setShuffleFinishTime(time);
}
@Override
public long getSortFinishTime() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getSortFinishTime());
}
@Override
public void setSortFinishTime(long time) {
maybeInitBuilder();
builder.setSortFinishTime(time);
}
@Override @Override
public TaskAttemptId getTaskAttemptId() { public TaskAttemptId getTaskAttemptId() {
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder; TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
@ -262,7 +287,4 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
private Phase convertFromProtoFormat(PhaseProto e) { private Phase convertFromProtoFormat(PhaseProto e) {
return MRProtoUtils.convertFromProtoFormat(e); return MRProtoUtils.convertFromProtoFormat(e);
} }
} }

View File

@ -132,6 +132,7 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
maybeInitBuilder(); maybeInitBuilder();
builder.setStartTime((startTime)); builder.setStartTime((startTime));
} }
@Override @Override
public long getFinishTime() { public long getFinishTime() {
TaskReportProtoOrBuilder p = viaProto ? proto : builder; TaskReportProtoOrBuilder p = viaProto ? proto : builder;
@ -143,6 +144,7 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
maybeInitBuilder(); maybeInitBuilder();
builder.setFinishTime((finishTime)); builder.setFinishTime((finishTime));
} }
@Override @Override
public TaskId getTaskId() { public TaskId getTaskId() {
TaskReportProtoOrBuilder p = viaProto ? proto : builder; TaskReportProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -117,6 +117,8 @@ message TaskAttemptReportProto {
optional string diagnostic_info = 7; optional string diagnostic_info = 7;
optional string state_string = 8; optional string state_string = 8;
optional PhaseProto phase = 9; optional PhaseProto phase = 9;
optional int64 shuffle_finish_time = 10;
optional int64 sort_finish_time=11;
} }
enum JobStateProto { enum JobStateProto {

View File

@ -379,46 +379,46 @@ public class JobHistoryParser {
} }
} }
/** Get the job submit time */ /** @return the job submit time */
public long getSubmitTime() { return submitTime; } public long getSubmitTime() { return submitTime; }
/** Get the job finish time */ /** @return the job finish time */
public long getFinishTime() { return finishTime; } public long getFinishTime() { return finishTime; }
/** Get the job id */ /** @return the job id */
public JobID getJobId() { return jobid; } public JobID getJobId() { return jobid; }
/** Get the user name */ /** @return the user name */
public String getUsername() { return username; } public String getUsername() { return username; }
/** Get the job name */ /** @return the job name */
public String getJobname() { return jobname; } public String getJobname() { return jobname; }
/** Get the job queue name */ /** @return the job queue name */
public String getJobQueueName() { return jobQueueName; } public String getJobQueueName() { return jobQueueName; }
/** Get the path for the job configuration file */ /** @return the path for the job configuration file */
public String getJobConfPath() { return jobConfPath; } public String getJobConfPath() { return jobConfPath; }
/** Get the job launch time */ /** @return the job launch time */
public long getLaunchTime() { return launchTime; } public long getLaunchTime() { return launchTime; }
/** Get the total number of maps */ /** @return the total number of maps */
public long getTotalMaps() { return totalMaps; } public long getTotalMaps() { return totalMaps; }
/** Get the total number of reduces */ /** @return the total number of reduces */
public long getTotalReduces() { return totalReduces; } public long getTotalReduces() { return totalReduces; }
/** Get the total number of failed maps */ /** @return the total number of failed maps */
public long getFailedMaps() { return failedMaps; } public long getFailedMaps() { return failedMaps; }
/** Get the number of failed reduces */ /** @return the number of failed reduces */
public long getFailedReduces() { return failedReduces; } public long getFailedReduces() { return failedReduces; }
/** Get the number of finished maps */ /** @return the number of finished maps */
public long getFinishedMaps() { return finishedMaps; } public long getFinishedMaps() { return finishedMaps; }
/** Get the number of finished reduces */ /** @return the number of finished reduces */
public long getFinishedReduces() { return finishedReduces; } public long getFinishedReduces() { return finishedReduces; }
/** Get the job status */ /** @return the job status */
public String getJobStatus() { return jobStatus; } public String getJobStatus() { return jobStatus; }
public String getErrorInfo() { return errorInfo; } public String getErrorInfo() { return errorInfo; }
/** Get the counters for the job */ /** @return the counters for the job */
public Counters getTotalCounters() { return totalCounters; } public Counters getTotalCounters() { return totalCounters; }
/** Get the map counters for the job */ /** @return the map counters for the job */
public Counters getMapCounters() { return mapCounters; } public Counters getMapCounters() { return mapCounters; }
/** Get the reduce counters for the job */ /** @return the reduce counters for the job */
public Counters getReduceCounters() { return reduceCounters; } public Counters getReduceCounters() { return reduceCounters; }
/** Get the map of all tasks in this job */ /** @return the map of all tasks in this job */
public Map<TaskID, TaskInfo> getAllTasks() { return tasksMap; } public Map<TaskID, TaskInfo> getAllTasks() { return tasksMap; }
/** Get the priority of this job */ /** @return the priority of this job */
public String getPriority() { return priority.toString(); } public String getPriority() { return priority.toString(); }
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; } public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
} }
@ -458,27 +458,27 @@ public class JobHistoryParser {
} }
} }
/** Get the Task ID */ /** @return the Task ID */
public TaskID getTaskId() { return taskId; } public TaskID getTaskId() { return taskId; }
/** Get the start time of this task */ /** @return the start time of this task */
public long getStartTime() { return startTime; } public long getStartTime() { return startTime; }
/** Get the finish time of this task */ /** @return the finish time of this task */
public long getFinishTime() { return finishTime; } public long getFinishTime() { return finishTime; }
/** Get the task type */ /** @return the task type */
public TaskType getTaskType() { return taskType; } public TaskType getTaskType() { return taskType; }
/** Get the split locations */ /** @return the split locations */
public String getSplitLocations() { return splitLocations; } public String getSplitLocations() { return splitLocations; }
/** Get the counters for this task */ /** @return the counters for this task */
public Counters getCounters() { return counters; } public Counters getCounters() { return counters; }
/** Get the task status */ /** @return the task status */
public String getTaskStatus() { return status; } public String getTaskStatus() { return status; }
/** Get the attempt Id that caused this task to fail */ /** @return the attempt Id that caused this task to fail */
public TaskAttemptID getFailedDueToAttemptId() { public TaskAttemptID getFailedDueToAttemptId() {
return failedDueToAttemptId; return failedDueToAttemptId;
} }
/** Get the error */ /** @return the error */
public String getError() { return error; } public String getError() { return error; }
/** Get the map of all attempts for this task */ /** @return the map of all attempts for this task */
public Map<TaskAttemptID, TaskAttemptInfo> getAllTaskAttempts() { public Map<TaskAttemptID, TaskAttemptInfo> getAllTaskAttempts() {
return attemptsMap; return attemptsMap;
} }
@ -530,33 +530,33 @@ public class JobHistoryParser {
} }
} }
/** Get the attempt Id */ /** @return the attempt Id */
public TaskAttemptID getAttemptId() { return attemptId; } public TaskAttemptID getAttemptId() { return attemptId; }
/** Get the start time of the attempt */ /** @return the start time of the attempt */
public long getStartTime() { return startTime; } public long getStartTime() { return startTime; }
/** Get the finish time of the attempt */ /** @return the finish time of the attempt */
public long getFinishTime() { return finishTime; } public long getFinishTime() { return finishTime; }
/** Get the shuffle finish time. Applicable only for reduce attempts */ /** @return the shuffle finish time. Applicable only for reduce attempts */
public long getShuffleFinishTime() { return shuffleFinishTime; } public long getShuffleFinishTime() { return shuffleFinishTime; }
/** Get the sort finish time. Applicable only for reduce attempts */ /** @return the sort finish time. Applicable only for reduce attempts */
public long getSortFinishTime() { return sortFinishTime; } public long getSortFinishTime() { return sortFinishTime; }
/** Get the map finish time. Applicable only for map attempts */ /** @return the map finish time. Applicable only for map attempts */
public long getMapFinishTime() { return mapFinishTime; } public long getMapFinishTime() { return mapFinishTime; }
/** Get the error string */ /** @return the error string */
public String getError() { return error; } public String getError() { return error; }
/** Get the state */ /** @return the state */
public String getState() { return state; } public String getState() { return state; }
/** Get the task status */ /** @return the task status */
public String getTaskStatus() { return status; } public String getTaskStatus() { return status; }
/** Get the task type */ /** @return the task type */
public TaskType getTaskType() { return taskType; } public TaskType getTaskType() { return taskType; }
/** Get the tracker name where the attempt executed */ /** @return the tracker name where the attempt executed */
public String getTrackerName() { return trackerName; } public String getTrackerName() { return trackerName; }
/** Get the host name */ /** @return the host name */
public String getHostname() { return hostname; } public String getHostname() { return hostname; }
/** Get the counters for the attempt */ /** @return the counters for the attempt */
public Counters getCounters() { return counters; } public Counters getCounters() { return counters; }
/** Get the HTTP port for the tracker */ /** @return the HTTP port for the tracker */
public int getHttpPort() { return httpPort; } public int getHttpPort() { return httpPort; }
} }
} }

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskStatus;
@ -28,7 +26,6 @@ import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapred.ProgressSplitsBlock; import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.avro.util.Utf8; import org.apache.avro.util.Utf8;

View File

@ -87,7 +87,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
user = userName; user = userName;
counters = TypeConverter.toYarn(jobInfo.getTotalCounters()); counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
diagnostics.add(jobInfo.getErrorInfo()); diagnostics.add(jobInfo.getErrorInfo());
report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class); report =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
JobReport.class);
report.setJobId(jobId); report.setJobId(jobId);
report.setJobState(JobState.valueOf(jobInfo.getJobStatus())); report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
report.setStartTime(jobInfo.getLaunchTime()); report.setStartTime(jobInfo.getLaunchTime());
@ -194,11 +196,12 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
int attemptRunTime = -1; int attemptRunTime = -1;
if (taskAttempt.getLaunchTime() != 0 && taskAttempt.getFinishTime() != 0) { if (taskAttempt.getLaunchTime() != 0 && taskAttempt.getFinishTime() != 0) {
attemptRunTime = (int) (taskAttempt.getFinishTime() - taskAttempt attemptRunTime =
.getLaunchTime()); (int) (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
} }
// Default to KILLED // Default to KILLED
TaskAttemptCompletionEventStatus taceStatus = TaskAttemptCompletionEventStatus.KILLED; TaskAttemptCompletionEventStatus taceStatus =
TaskAttemptCompletionEventStatus.KILLED;
String taStateString = taskAttempt.getState().toString(); String taStateString = taskAttempt.getState().toString();
try { try {
taceStatus = TaskAttemptCompletionEventStatus.valueOf(taStateString); taceStatus = TaskAttemptCompletionEventStatus.valueOf(taStateString);
@ -224,7 +227,8 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
} }
//History data is leisurely loaded when task level data is requested //History data is leisurely loaded when task level data is requested
private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute) throws IOException { private synchronized void loadFullHistoryData(boolean loadTasks,
Path historyFileAbsolute) throws IOException {
LOG.info("Loading history file: [" + historyFileAbsolute + "]"); LOG.info("Loading history file: [" + historyFileAbsolute + "]");
if (jobInfo != null) { if (jobInfo != null) {
return; //data already loaded return; //data already loaded
@ -232,11 +236,13 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
if (historyFileAbsolute != null) { if (historyFileAbsolute != null) {
try { try {
JobHistoryParser parser = new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), historyFileAbsolute); JobHistoryParser parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute);
jobInfo = parser.parse(); jobInfo = parser.parse();
} catch (IOException e) { } catch (IOException e) {
throw new YarnException("Could not load history file " + historyFileAbsolute, throw new YarnException("Could not load history file "
e); + historyFileAbsolute, e);
} }
} else { } else {
throw new IOException("History file not found"); throw new IOException("History file not found");
@ -295,7 +301,8 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
} }
@Override @Override
public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) { public
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
if (!UserGroupInformation.isSecurityEnabled()) { if (!UserGroupInformation.isSecurityEnabled()) {
return true; return true;
} }

View File

@ -71,6 +71,8 @@ public class CompletedTaskAttempt implements TaskAttempt {
report.setStartTime(attemptInfo.getStartTime()); report.setStartTime(attemptInfo.getStartTime());
report.setFinishTime(attemptInfo.getFinishTime()); report.setFinishTime(attemptInfo.getFinishTime());
report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
report.setSortFinishTime(attemptInfo.getSortFinishTime());
if (localDiagMessage != null) { if (localDiagMessage != null) {
report.setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage); report.setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
} else { } else {
@ -159,9 +161,18 @@ public class CompletedTaskAttempt implements TaskAttempt {
return report.getFinishTime(); return report.getFinishTime();
} }
@Override
public long getShuffleFinishTime() {
return report.getShuffleFinishTime();
}
@Override
public long getSortFinishTime() {
return report.getSortFinishTime();
}
@Override @Override
public int getShufflePort() { public int getShufflePort() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
} }

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -55,6 +56,12 @@ public class HsJobBlock extends HtmlBlock {
int killedReduceAttempts = 0; int killedReduceAttempts = 0;
int failedReduceAttempts = 0; int failedReduceAttempts = 0;
int successfulReduceAttempts = 0; int successfulReduceAttempts = 0;
long avgMapTime = 0;
long avgReduceTime = 0;
long avgShuffleTime = 0;
long avgSortTime = 0;
int numMaps;
int numReduces;
@Inject HsJobBlock(AppContext appctx) { @Inject HsJobBlock(AppContext appctx) {
appContext = appctx; appContext = appctx;
@ -96,7 +103,7 @@ public class HsJobBlock extends HtmlBlock {
_("Started:", new Date(startTime)). _("Started:", new Date(startTime)).
_("Finished:", new Date(finishTime)). _("Finished:", new Date(finishTime)).
_("Elapsed:", StringUtils.formatTime( _("Elapsed:", StringUtils.formatTime(
Times.elapsed(startTime, finishTime))); Times.elapsed(startTime, finishTime, false)));
List<String> diagnostics = job.getDiagnostics(); List<String> diagnostics = job.getDiagnostics();
if(diagnostics != null && !diagnostics.isEmpty()) { if(diagnostics != null && !diagnostics.isEmpty()) {
@ -107,6 +114,15 @@ public class HsJobBlock extends HtmlBlock {
infoBlock._("Diagnostics:", b.toString()); infoBlock._("Diagnostics:", b.toString());
} }
if(numMaps > 0) {
infoBlock._("Average Map Time", StringUtils.formatTime(avgMapTime));
}
if(numReduces > 0) {
infoBlock._("Average Reduce Time", StringUtils.formatTime(avgReduceTime));
infoBlock._("Average Shuffle Time", StringUtils.formatTime(avgShuffleTime));
infoBlock._("Average Merge Time", StringUtils.formatTime(avgSortTime));
}
for(Map.Entry<JobACL, AccessControlList> entry : acls.entrySet()) { for(Map.Entry<JobACL, AccessControlList> entry : acls.entrySet()) {
infoBlock._("ACL "+entry.getKey().getAclName()+":", infoBlock._("ACL "+entry.getKey().getAclName()+":",
entry.getValue().getAclString()); entry.getValue().getAclString());
@ -174,6 +190,8 @@ public class HsJobBlock extends HtmlBlock {
* @param job the job to get counts for. * @param job the job to get counts for.
*/ */
private void countTasksAndAttempts(Job job) { private void countTasksAndAttempts(Job job) {
numReduces = 0;
numMaps = 0;
Map<TaskId, Task> tasks = job.getTasks(); Map<TaskId, Task> tasks = job.getTasks();
for (Task task : tasks.values()) { for (Task task : tasks.values()) {
// Attempts counts // Attempts counts
@ -203,14 +221,38 @@ public class HsJobBlock extends HtmlBlock {
successfulMapAttempts += successful; successfulMapAttempts += successful;
failedMapAttempts += failed; failedMapAttempts += failed;
killedMapAttempts += killed; killedMapAttempts += killed;
if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
numMaps++;
avgMapTime += (attempt.getFinishTime() -
attempt.getLaunchTime());
}
break; break;
case REDUCE: case REDUCE:
successfulReduceAttempts += successful; successfulReduceAttempts += successful;
failedReduceAttempts += failed; failedReduceAttempts += failed;
killedReduceAttempts += killed; killedReduceAttempts += killed;
if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
numReduces++;
avgShuffleTime += (attempt.getShuffleFinishTime() -
attempt.getLaunchTime());
avgSortTime += attempt.getSortFinishTime() -
attempt.getLaunchTime();
avgReduceTime += (attempt.getFinishTime() -
attempt.getShuffleFinishTime());
}
break; break;
} }
} }
} }
if(numMaps > 0) {
avgMapTime = avgMapTime / numMaps;
}
if(numReduces > 0) {
avgReduceTime = avgReduceTime / numReduces;
avgShuffleTime = avgShuffleTime / numReduces;
avgSortTime = avgSortTime / numReduces;
}
} }
} }

View File

@ -18,27 +18,32 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp; package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection; import java.util.Collection;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App; import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.SubView; import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TFOOT;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -67,45 +72,160 @@ public class HsTaskPage extends HsView {
h2($(TITLE)); h2($(TITLE));
return; return;
} }
TBODY<TABLE<Hamlet>> tbody = html. TaskType type = null;
String symbol = $(TASK_TYPE);
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
} else {
type = app.getTask().getType();
}
TR<THEAD<TABLE<Hamlet>>> headRow = html.
table("#attempts"). table("#attempts").
thead(). thead().
tr(). tr();
headRow.
th(".id", "Attempt"). th(".id", "Attempt").
th(".state", "State"). th(".state", "State").
th(".node", "node"). th(".node", "node").
th(".tsh", "Started"). th(".tsh", "Start Time");
th(".tsh", "Finished").
th(".tsh", "Elapsed"). if(type == TaskType.REDUCE) {
th(".note", "Note")._()._(). headRow.th("Shuffle Finish Time");
tbody(); headRow.th("Merge Finish Time");
}
headRow.th("Finish Time"); //Attempt
if(type == TaskType.REDUCE) {
headRow.th("Elapsed Time Shuffle"); //Attempt
headRow.th("Elapsed Time Merge"); //Attempt
headRow.th("Elapsed Time Reduce"); //Attempt
}
headRow.th("Elapsed Time").
th(".note", "Note");
TBODY<TABLE<Hamlet>> tbody = headRow._()._().tbody();
for (TaskAttempt ta : getTaskAttempts()) { for (TaskAttempt ta : getTaskAttempts()) {
String taid = MRApps.toString(ta.getID()); String taid = MRApps.toString(ta.getID());
ContainerId containerId = ta.getAssignedContainerID();
String nodeHttpAddr = ta.getNodeHttpAddress(); String nodeHttpAddr = ta.getNodeHttpAddress();
long startTime = ta.getLaunchTime();
long finishTime = ta.getFinishTime(); long attemptStartTime = ta.getLaunchTime();
long elapsed = Times.elapsed(startTime, finishTime); long shuffleFinishTime = -1;
TD<TR<TBODY<TABLE<Hamlet>>>> nodeTd = tbody. long sortFinishTime = -1;
tr(). long attemptFinishTime = ta.getFinishTime();
td(".id", taid). long elapsedShuffleTime = -1;
td(".state", ta.getState().toString()). long elapsedSortTime = -1;
long elapsedReduceTime = -1;
if(type == TaskType.REDUCE) {
shuffleFinishTime = ta.getShuffleFinishTime();
sortFinishTime = ta.getSortFinishTime();
elapsedShuffleTime =
Times.elapsed(attemptStartTime, shuffleFinishTime, false);
elapsedSortTime =
Times.elapsed(shuffleFinishTime, sortFinishTime, false);
elapsedReduceTime =
Times.elapsed(sortFinishTime, attemptFinishTime, false);
}
long attemptElapsed =
Times.elapsed(attemptStartTime, attemptFinishTime, false);
int sortId = ta.getID().getId() + (ta.getID().getTaskId().getId() * 10000);
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
row.
td(). td().
a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr); br().$title(String.valueOf(sortId))._(). // sorting
if (containerId != null) { _(taid)._().
String containerIdStr = ConverterUtils.toString(containerId); td(ta.getState().toString()).
nodeTd._(" "). td().a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr)._();
a(".logslink", url("http://", nodeHttpAddr, "yarn", "containerlogs",
containerIdStr), "logs"); row.td().
br().$title(String.valueOf(attemptStartTime))._().
_(Times.format(attemptStartTime))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(shuffleFinishTime))._().
_(Times.format(shuffleFinishTime))._();
row.td().
br().$title(String.valueOf(sortFinishTime))._().
_(Times.format(sortFinishTime))._();
} }
nodeTd._(). row.
td(".ts", Times.format(startTime)). td().
td(".ts", Times.format(finishTime)). br().$title(String.valueOf(attemptFinishTime))._().
td(".dt", StringUtils.formatTime(elapsed)). _(Times.format(attemptFinishTime))._();
td(".note", Joiner.on('\n').join(ta.getDiagnostics()))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(elapsedShuffleTime))._().
_(formatTime(elapsedShuffleTime))._();
row.td().
br().$title(String.valueOf(elapsedSortTime))._().
_(formatTime(elapsedSortTime))._();
row.td().
br().$title(String.valueOf(elapsedReduceTime))._().
_(formatTime(elapsedReduceTime))._();
} }
tbody._()._();
row.
td().
br().$title(String.valueOf(attemptElapsed))._().
_(formatTime(attemptElapsed))._().
td(".note", Joiner.on('\n').join(ta.getDiagnostics()));
row._();
}
TR<TFOOT<TABLE<Hamlet>>> footRow = tbody._().tfoot().tr();
footRow.
th().input("search_init").$type(InputType.text).
$name("attempt_name").$value("Attempt")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_state").$value("State")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_node").$value("Node")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_start_time").$value("Start Time")._()._();
if(type == TaskType.REDUCE) {
footRow.
th().input("search_init").$type(InputType.text).
$name("shuffle_time").$value("Shuffle Time")._()._();
footRow.
th().input("search_init").$type(InputType.text).
$name("merge_time").$value("Merge Time")._()._();
}
footRow.
th().input("search_init").$type(InputType.text).
$name("attempt_finish").$value("Finish Time")._()._();
if(type == TaskType.REDUCE) {
footRow.
th().input("search_init").$type(InputType.text).
$name("elapsed_shuffle_time").$value("Elapsed Shuffle Time")._()._();
footRow.
th().input("search_init").$type(InputType.text).
$name("elapsed_merge_time").$value("Elapsed Merge Time")._()._();
footRow.
th().input("search_init").$type(InputType.text).
$name("elapsed_reduce_time").$value("Elapsed Reduce Time")._()._();
}
footRow.
th().input("search_init").$type(InputType.text).
$name("attempt_elapsed").$value("Elapsed Time")._()._().
th().input("search_init").$type(InputType.text).
$name("note").$value("Note")._()._();
footRow._()._()._();
}
private String formatTime(long elapsed) {
return elapsed < 0 ? "N/A" : StringUtils.formatTime(elapsed);
} }
/** /**
@ -134,6 +254,7 @@ public class HsTaskPage extends HsView {
//Set up the java script and CSS for the attempts table //Set up the java script and CSS for the attempts table
set(DATATABLES_ID, "attempts"); set(DATATABLES_ID, "attempts");
set(initID(DATATABLES, "attempts"), attemptsTableInit()); set(initID(DATATABLES, "attempts"), attemptsTableInit());
set(postInitID(DATATABLES, "attempts"), attemptsPostTableInit());
setTableStyles(html, "attempts"); setTableStyles(html, "attempts");
} }
@ -150,6 +271,49 @@ public class HsTaskPage extends HsView {
* attempts table. * attempts table.
*/ */
private String attemptsTableInit() { private String attemptsTableInit() {
return tableInit().append("}").toString(); TaskType type = null;
String symbol = $(TASK_TYPE);
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
} else {
TaskId taskID = MRApps.toTaskID($(TASK_ID));
type = taskID.getTaskType();
}
StringBuilder b = tableInit().
append(",aoColumnDefs:[");
b.append("{'sType':'title-numeric', 'aTargets': [ 0");
if(type == TaskType.REDUCE) {
b.append(", 7, 8, 9, 10");
} else { //MAP
b.append(", 5");
}
b.append(" ] }");
b.append("]}");
return b.toString();
}
private String attemptsPostTableInit() {
return "var asInitVals = new Array();\n" +
"$('tfoot input').keyup( function () \n{"+
" attemptsDataTable.fnFilter( this.value, $('tfoot input').index(this) );\n"+
"} );\n"+
"$('tfoot input').each( function (i) {\n"+
" asInitVals[i] = this.value;\n"+
"} );\n"+
"$('tfoot input').focus( function () {\n"+
" if ( this.className == 'search_init' )\n"+
" {\n"+
" this.className = '';\n"+
" this.value = '';\n"+
" }\n"+
"} );\n"+
"$('tfoot input').blur( function (i) {\n"+
" if ( this.value == '' )\n"+
" {\n"+
" this.className = 'search_init';\n"+
" this.value = asInitVals[$('tfoot input').index(this)];\n"+
" }\n"+
"} );\n";
} }
} }

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE; import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App; import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -30,6 +32,10 @@ import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TFOOT;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -59,27 +65,79 @@ public class HsTasksBlock extends HtmlBlock {
if (!symbol.isEmpty()) { if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol); type = MRApps.taskType(symbol);
} }
TBODY<TABLE<Hamlet>> tbody = html.
table("#tasks"). THEAD<TABLE<Hamlet>> thead = html.table("#tasks").thead();
thead(). //Create the spanning row
int attemptColSpan = type == TaskType.REDUCE ? 8 : 3;
thead.tr().
th().$colspan(5).$class("ui-state-default")._("Task")._().
th().$colspan(attemptColSpan).$class("ui-state-default").
_("Successful Attempt")._().
_();
TR<THEAD<TABLE<Hamlet>>> theadRow = thead.
tr(). tr().
th("Task"). th("Name").
th("State"). th("State").
th("Start Time"). th("Start Time").
th("Finish Time"). th("Finish Time").
th("Elapsed Time")._()._(). th("Elapsed Time").
tbody(); th("Start Time"); //Attempt
if(type == TaskType.REDUCE) {
theadRow.th("Shuffle Finish Time"); //Attempt
theadRow.th("Merge Finish Time"); //Attempt
}
theadRow.th("Finish Time"); //Attempt
if(type == TaskType.REDUCE) {
theadRow.th("Elapsed Time Shuffle"); //Attempt
theadRow.th("Elapsed Time Merge"); //Attempt
theadRow.th("Elapsed Time Reduce"); //Attempt
}
theadRow.th("Elapsed Time"); //Attempt
TBODY<TABLE<Hamlet>> tbody = theadRow._()._().tbody();
for (Task task : app.getJob().getTasks().values()) { for (Task task : app.getJob().getTasks().values()) {
if (type != null && task.getType() != type) { if (type != null && task.getType() != type) {
continue; continue;
} }
String tid = MRApps.toString(task.getID()); String tid = MRApps.toString(task.getID());
TaskReport report = task.getReport(); TaskReport report = task.getReport();
long startTime = report.getStartTime(); long startTime = report.getStartTime();
long finishTime = report.getFinishTime(); long finishTime = report.getFinishTime();
long elapsed = Times.elapsed(startTime, finishTime); long elapsed = Times.elapsed(startTime, finishTime, false);
tbody.
tr(). long attemptStartTime = -1;
long shuffleFinishTime = -1;
long sortFinishTime = -1;
long attemptFinishTime = -1;
long elapsedShuffleTime = -1;
long elapsedSortTime = -1;;
long elapsedReduceTime = -1;
long attemptElapsed = -1;
TaskAttempt successful = getSuccessfulAttempt(task);
if(successful != null) {
attemptStartTime = successful.getLaunchTime();
attemptFinishTime = successful.getFinishTime();
if(type == TaskType.REDUCE) {
shuffleFinishTime = successful.getShuffleFinishTime();
sortFinishTime = successful.getSortFinishTime();
elapsedShuffleTime =
Times.elapsed(attemptStartTime, shuffleFinishTime, false);
elapsedSortTime =
Times.elapsed(shuffleFinishTime, sortFinishTime, false);
elapsedReduceTime =
Times.elapsed(sortFinishTime, attemptFinishTime, false);
}
attemptElapsed =
Times.elapsed(attemptStartTime, attemptFinishTime, false);
}
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
row.
td(). td().
br().$title(String.valueOf(task.getID().getId()))._(). // sorting br().$title(String.valueOf(task.getID().getId()))._(). // sorting
a(url("task", tid), tid)._(). a(url("task", tid), tid)._().
@ -92,8 +150,86 @@ public class HsTasksBlock extends HtmlBlock {
_(Times.format(finishTime))._(). _(Times.format(finishTime))._().
td(). td().
br().$title(String.valueOf(elapsed))._(). br().$title(String.valueOf(elapsed))._().
_(StringUtils.formatTime(elapsed))._()._(); _(formatTime(elapsed))._().
td().
br().$title(String.valueOf(attemptStartTime))._().
_(Times.format(attemptStartTime))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(shuffleFinishTime))._().
_(Times.format(shuffleFinishTime))._();
row.td().
br().$title(String.valueOf(sortFinishTime))._().
_(Times.format(sortFinishTime))._();
} }
tbody._()._(); row.
td().
br().$title(String.valueOf(attemptFinishTime))._().
_(Times.format(attemptFinishTime))._();
if(type == TaskType.REDUCE) {
row.td().
br().$title(String.valueOf(elapsedShuffleTime))._().
_(formatTime(elapsedShuffleTime))._();
row.td().
br().$title(String.valueOf(elapsedSortTime))._().
_(formatTime(elapsedSortTime))._();
row.td().
br().$title(String.valueOf(elapsedReduceTime))._().
_(formatTime(elapsedReduceTime))._();
}
row.td().
br().$title(String.valueOf(attemptElapsed))._().
_(formatTime(attemptElapsed))._();
row._();
}
TR<TFOOT<TABLE<Hamlet>>> footRow = tbody._().tfoot().tr();
footRow.th().input("search_init").$type(InputType.text).$name("task")
.$value("ID")._()._().th().input("search_init").$type(InputType.text)
.$name("state").$value("State")._()._().th().input("search_init")
.$type(InputType.text).$name("start_time").$value("Start Time")._()._()
.th().input("search_init").$type(InputType.text).$name("finish_time")
.$value("Finish Time")._()._().th().input("search_init")
.$type(InputType.text).$name("elapsed_time").$value("Elapsed Time")._()
._().th().input("search_init").$type(InputType.text)
.$name("attempt_start_time").$value("Start Time")._()._();
if(type == TaskType.REDUCE) {
footRow.th().input("search_init").$type(InputType.text)
.$name("shuffle_time").$value("Shuffle Time")._()._();
footRow.th().input("search_init").$type(InputType.text)
.$name("merge_time").$value("Merge Time")._()._();
}
footRow.th().input("search_init").$type(InputType.text)
.$name("attempt_finish").$value("Finish Time")._()._();
if(type == TaskType.REDUCE) {
footRow.th().input("search_init").$type(InputType.text)
.$name("elapsed_shuffle_time").$value("Elapsed Shuffle Time")._()._();
footRow.th().input("search_init").$type(InputType.text)
.$name("elapsed_merge_time").$value("Elapsed Merge Time")._()._();
footRow.th().input("search_init").$type(InputType.text)
.$name("elapsed_reduce_time").$value("Elapsed Reduce Time")._()._();
}
footRow.th().input("search_init").$type(InputType.text)
.$name("attempt_elapsed").$value("Elapsed Time")._()._();
footRow._()._()._();
}
private String formatTime(long elapsed) {
return elapsed < 0 ? "N/A" : StringUtils.formatTime(elapsed);
}
private TaskAttempt getSuccessfulAttempt(Task task) {
for(TaskAttempt attempt: task.getAttempts().values()) {
if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
return attempt;
}
}
return null;
} }
} }

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp; package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.SubView; import org.apache.hadoop.yarn.webapp.SubView;
/** /**
@ -40,6 +44,7 @@ public class HsTasksPage extends HsView {
set(DATATABLES_ID, "tasks"); set(DATATABLES_ID, "tasks");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}"); set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
set(initID(DATATABLES, "tasks"), tasksTableInit()); set(initID(DATATABLES, "tasks"), tasksTableInit());
set(postInitID(DATATABLES, "tasks"), jobsPostTableInit());
setTableStyles(html, "tasks"); setTableStyles(html, "tasks");
} }
@ -56,9 +61,45 @@ public class HsTasksPage extends HsView {
* for the tasks table. * for the tasks table.
*/ */
private String tasksTableInit() { private String tasksTableInit() {
return tableInit(). TaskType type = null;
append(",aoColumns:[{sType:'title-numeric'},{sType:'title-numeric',"). String symbol = $(TASK_TYPE);
append("bSearchable:false},null,{sType:'title-numeric'},"). if (!symbol.isEmpty()) {
append("{sType:'title-numeric'},{sType:'title-numeric'}]}").toString(); type = MRApps.taskType(symbol);
}
StringBuilder b = tableInit().
append(",aoColumnDefs:[");
b.append("{'sType':'title-numeric', 'aTargets': [ 0, 4");
if(type == TaskType.REDUCE) {
b.append(", 9, 10, 11, 12");
} else { //MAP
b.append(", 7");
}
b.append(" ] }");
b.append("]}");
return b.toString();
}
private String jobsPostTableInit() {
return "var asInitVals = new Array();\n" +
"$('tfoot input').keyup( function () \n{"+
" tasksDataTable.fnFilter( this.value, $('tfoot input').index(this) );\n"+
"} );\n"+
"$('tfoot input').each( function (i) {\n"+
" asInitVals[i] = this.value;\n"+
"} );\n"+
"$('tfoot input').focus( function () {\n"+
" if ( this.className == 'search_init' )\n"+
" {\n"+
" this.className = '';\n"+
" this.value = '';\n"+
" }\n"+
"} );\n"+
"$('tfoot input').blur( function (i) {\n"+
" if ( this.value == '' )\n"+
" {\n"+
" this.className = 'search_init';\n"+
" this.value = asInitVals[$('tfoot input').index(this)];\n"+
" }\n"+
"} );\n";
} }
} }

View File

@ -26,13 +26,18 @@ import static org.junit.Assert.assertEquals;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams;
import org.apache.hadoop.mapreduce.v2.app.webapp.TestAMWebApp;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -118,19 +123,27 @@ public class TestHSWebApp {
@Test public void testJobView() { @Test public void testJobView() {
LOG.info("HsJobPage"); LOG.info("HsJobPage");
WebAppTests.testPage(HsJobPage.class, AppContext.class, new TestAppContext()); AppContext appContext = new TestAppContext();
Map<String, String> params = TestAMWebApp.getJobParams(appContext);
WebAppTests.testPage(HsJobPage.class, AppContext.class, appContext, params);
} }
@Test public void testTasksView() { @Test
public void testTasksView() {
LOG.info("HsTasksPage"); LOG.info("HsTasksPage");
WebAppTests.testPage(HsTasksPage.class, AppContext.class, AppContext appContext = new TestAppContext();
new TestAppContext()); Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
WebAppTests.testPage(HsTasksPage.class, AppContext.class, appContext,
params);
} }
@Test public void testTaskView() { @Test
public void testTaskView() {
LOG.info("HsTaskPage"); LOG.info("HsTaskPage");
WebAppTests.testPage(HsTaskPage.class, AppContext.class, AppContext appContext = new TestAppContext();
new TestAppContext()); Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
WebAppTests
.testPage(HsTaskPage.class, AppContext.class, appContext, params);
} }
@Test public void testAttemptsWithJobView() { @Test public void testAttemptsWithJobView() {
@ -147,8 +160,10 @@ public class TestHSWebApp {
@Test public void testAttemptsView() { @Test public void testAttemptsView() {
LOG.info("HsAttemptsPage"); LOG.info("HsAttemptsPage");
AppContext appContext = new TestAppContext();
Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
WebAppTests.testPage(HsAttemptsPage.class, AppContext.class, WebAppTests.testPage(HsAttemptsPage.class, AppContext.class,
new TestAppContext()); appContext, params);
} }
@Test public void testConfView() { @Test public void testConfView() {

View File

@ -30,10 +30,18 @@ public class Times {
}; };
public static long elapsed(long started, long finished) { public static long elapsed(long started, long finished) {
return Times.elapsed(started, finished, true);
}
public static long elapsed(long started, long finished, boolean isRunning) {
if (finished > 0) { if (finished > 0) {
return finished - started; return finished - started;
} }
if (isRunning) {
return started > 0 ? System.currentTimeMillis() - started : 0; return started > 0 ? System.currentTimeMillis() - started : 0;
} else {
return -1;
}
} }
public static String format(long ts) { public static String format(long ts) {