MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1552808 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
17a3d72d3b
commit
04dac63695
|
@ -190,6 +190,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
|
MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
|
||||||
to speculative execution (Gera Shegalov via Sandy Ryza)
|
to speculative execution (Gera Shegalov via Sandy Ryza)
|
||||||
|
|
||||||
|
MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
|
||||||
|
with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
|
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
|
||||||
|
|
|
@ -376,11 +376,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
|
TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
|
TaskAttempt bestAttempt = selectBestAttempt();
|
||||||
report.setTaskId(taskId);
|
report.setTaskId(taskId);
|
||||||
report.setStartTime(getLaunchTime());
|
report.setStartTime(getLaunchTime());
|
||||||
report.setFinishTime(getFinishTime());
|
report.setFinishTime(getFinishTime());
|
||||||
report.setTaskState(getState());
|
report.setTaskState(getState());
|
||||||
report.setProgress(getProgress());
|
report.setProgress(bestAttempt == null ? 0f : bestAttempt.getProgress());
|
||||||
|
report.setStatus(bestAttempt == null
|
||||||
|
? ""
|
||||||
|
: bestAttempt.getReport().getStateString());
|
||||||
|
|
||||||
for (TaskAttempt attempt : attempts.values()) {
|
for (TaskAttempt attempt : attempts.values()) {
|
||||||
if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
|
if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
|
||||||
|
@ -400,7 +404,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
|
|
||||||
// Add a copy of counters as the last step so that their lifetime on heap
|
// Add a copy of counters as the last step so that their lifetime on heap
|
||||||
// is as small as possible.
|
// is as small as possible.
|
||||||
report.setCounters(TypeConverter.toYarn(getCounters()));
|
report.setCounters(TypeConverter.toYarn(bestAttempt == null
|
||||||
|
? TaskAttemptImpl.EMPTY_COUNTERS
|
||||||
|
: bestAttempt.getCounters()));
|
||||||
|
|
||||||
return report;
|
return report;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -63,6 +63,7 @@ public class TaskPage extends AppView {
|
||||||
th(".id", "Attempt").
|
th(".id", "Attempt").
|
||||||
th(".progress", "Progress").
|
th(".progress", "Progress").
|
||||||
th(".state", "State").
|
th(".state", "State").
|
||||||
|
th(".status", "Status").
|
||||||
th(".node", "Node").
|
th(".node", "Node").
|
||||||
th(".logs", "Logs").
|
th(".logs", "Logs").
|
||||||
th(".tsh", "Started").
|
th(".tsh", "Started").
|
||||||
|
@ -84,6 +85,7 @@ public class TaskPage extends AppView {
|
||||||
.append(ta.getId()).append("\",\"")
|
.append(ta.getId()).append("\",\"")
|
||||||
.append(progress).append("\",\"")
|
.append(progress).append("\",\"")
|
||||||
.append(ta.getState().toString()).append("\",\"")
|
.append(ta.getState().toString()).append("\",\"")
|
||||||
|
.append(ta.getStatus()).append("\",\"")
|
||||||
|
|
||||||
.append(nodeHttpAddr == null ? "N/A" :
|
.append(nodeHttpAddr == null ? "N/A" :
|
||||||
"<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>"
|
"<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>"
|
||||||
|
@ -144,13 +146,13 @@ public class TaskPage extends AppView {
|
||||||
.append("\n,aoColumnDefs:[\n")
|
.append("\n,aoColumnDefs:[\n")
|
||||||
|
|
||||||
//logs column should not filterable (it includes container ID which may pollute searches)
|
//logs column should not filterable (it includes container ID which may pollute searches)
|
||||||
.append("\n{'aTargets': [ 4 ]")
|
.append("\n{'aTargets': [ 5 ]")
|
||||||
.append(", 'bSearchable': false }")
|
.append(", 'bSearchable': false }")
|
||||||
|
|
||||||
.append("\n, {'sType':'numeric', 'aTargets': [ 5, 6")
|
.append("\n, {'sType':'numeric', 'aTargets': [ 6, 7")
|
||||||
.append(" ], 'mRender': renderHadoopDate }")
|
.append(" ], 'mRender': renderHadoopDate }")
|
||||||
|
|
||||||
.append("\n, {'sType':'numeric', 'aTargets': [ 7")
|
.append("\n, {'sType':'numeric', 'aTargets': [ 8")
|
||||||
.append(" ], 'mRender': renderHadoopElapsedTime }]")
|
.append(" ], 'mRender': renderHadoopElapsedTime }]")
|
||||||
|
|
||||||
// Sort by id upon page load
|
// Sort by id upon page load
|
||||||
|
|
|
@ -59,6 +59,7 @@ public class TasksBlock extends HtmlBlock {
|
||||||
tr().
|
tr().
|
||||||
th("Task").
|
th("Task").
|
||||||
th("Progress").
|
th("Progress").
|
||||||
|
th("Status").
|
||||||
th("State").
|
th("State").
|
||||||
th("Start Time").
|
th("Start Time").
|
||||||
th("Finish Time").
|
th("Finish Time").
|
||||||
|
@ -81,6 +82,7 @@ public class TasksBlock extends HtmlBlock {
|
||||||
.append(join(pct, '%')).append("'> ").append("<div class='")
|
.append(join(pct, '%')).append("'> ").append("<div class='")
|
||||||
.append(C_PROGRESSBAR_VALUE).append("' style='")
|
.append(C_PROGRESSBAR_VALUE).append("' style='")
|
||||||
.append(join("width:", pct, '%')).append("'> </div> </div>\",\"")
|
.append(join("width:", pct, '%')).append("'> </div> </div>\",\"")
|
||||||
|
.append(info.getStatus()).append("\",\"")
|
||||||
|
|
||||||
.append(info.getState()).append("\",\"")
|
.append(info.getState()).append("\",\"")
|
||||||
.append(info.getStartTime()).append("\",\"")
|
.append(info.getStartTime()).append("\",\"")
|
||||||
|
|
|
@ -50,10 +50,10 @@ public class TasksPage extends AppView {
|
||||||
.append(", 'mRender': parseHadoopProgress }")
|
.append(", 'mRender': parseHadoopProgress }")
|
||||||
|
|
||||||
|
|
||||||
.append("\n, {'sType':'numeric', 'aTargets': [3, 4]")
|
.append("\n, {'sType':'numeric', 'aTargets': [4, 5]")
|
||||||
.append(", 'mRender': renderHadoopDate }")
|
.append(", 'mRender': renderHadoopDate }")
|
||||||
|
|
||||||
.append("\n, {'sType':'numeric', 'aTargets': [5]")
|
.append("\n, {'sType':'numeric', 'aTargets': [6]")
|
||||||
.append(", 'mRender': renderHadoopElapsedTime }]")
|
.append(", 'mRender': renderHadoopElapsedTime }]")
|
||||||
|
|
||||||
// Sort by id upon page load
|
// Sort by id upon page load
|
||||||
|
|
|
@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlRootElement;
|
||||||
import javax.xml.bind.annotation.XmlSeeAlso;
|
import javax.xml.bind.annotation.XmlSeeAlso;
|
||||||
import javax.xml.bind.annotation.XmlTransient;
|
import javax.xml.bind.annotation.XmlTransient;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
|
@ -45,6 +46,7 @@ public class TaskAttemptInfo {
|
||||||
protected String id;
|
protected String id;
|
||||||
protected String rack;
|
protected String rack;
|
||||||
protected TaskAttemptState state;
|
protected TaskAttemptState state;
|
||||||
|
protected String status;
|
||||||
protected String nodeHttpAddress;
|
protected String nodeHttpAddress;
|
||||||
protected String diagnostics;
|
protected String diagnostics;
|
||||||
protected String type;
|
protected String type;
|
||||||
|
@ -61,29 +63,23 @@ public class TaskAttemptInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
|
public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
|
||||||
|
final TaskAttemptReport report = ta.getReport();
|
||||||
this.type = type.toString();
|
this.type = type.toString();
|
||||||
this.id = MRApps.toString(ta.getID());
|
this.id = MRApps.toString(ta.getID());
|
||||||
this.nodeHttpAddress = ta.getNodeHttpAddress();
|
this.nodeHttpAddress = ta.getNodeHttpAddress();
|
||||||
this.startTime = ta.getLaunchTime();
|
this.startTime = report.getStartTime();
|
||||||
this.finishTime = ta.getFinishTime();
|
this.finishTime = report.getFinishTime();
|
||||||
this.assignedContainerId = ConverterUtils.toString(ta
|
this.assignedContainerId = ConverterUtils.toString(report.getContainerId());
|
||||||
.getAssignedContainerID());
|
this.assignedContainer = report.getContainerId();
|
||||||
this.assignedContainer = ta.getAssignedContainerID();
|
this.progress = report.getProgress() * 100;
|
||||||
this.progress = ta.getProgress() * 100;
|
this.status = report.getStateString();
|
||||||
this.state = ta.getState();
|
this.state = report.getTaskAttemptState();
|
||||||
this.elapsedTime = Times
|
this.elapsedTime = Times
|
||||||
.elapsed(this.startTime, this.finishTime, isRunning);
|
.elapsed(this.startTime, this.finishTime, isRunning);
|
||||||
if (this.elapsedTime == -1) {
|
if (this.elapsedTime == -1) {
|
||||||
this.elapsedTime = 0;
|
this.elapsedTime = 0;
|
||||||
}
|
}
|
||||||
List<String> diagnostics = ta.getDiagnostics();
|
this.diagnostics = report.getDiagnosticInfo();
|
||||||
if (diagnostics != null && !diagnostics.isEmpty()) {
|
|
||||||
StringBuffer b = new StringBuffer();
|
|
||||||
for (String diag : diagnostics) {
|
|
||||||
b.append(diag);
|
|
||||||
}
|
|
||||||
this.diagnostics = b.toString();
|
|
||||||
}
|
|
||||||
this.rack = ta.getNodeRackName();
|
this.rack = ta.getNodeRackName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,6 +95,10 @@ public class TaskAttemptInfo {
|
||||||
return this.state.toString();
|
return this.state.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
public String getId() {
|
public String getId() {
|
||||||
return this.id;
|
return this.id;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ public class TaskInfo {
|
||||||
protected TaskState state;
|
protected TaskState state;
|
||||||
protected String type;
|
protected String type;
|
||||||
protected String successfulAttempt;
|
protected String successfulAttempt;
|
||||||
|
protected String status;
|
||||||
|
|
||||||
@XmlTransient
|
@XmlTransient
|
||||||
int taskNum;
|
int taskNum;
|
||||||
|
@ -66,6 +67,7 @@ public class TaskInfo {
|
||||||
this.elapsedTime = 0;
|
this.elapsedTime = 0;
|
||||||
}
|
}
|
||||||
this.progress = report.getProgress() * 100;
|
this.progress = report.getProgress() * 100;
|
||||||
|
this.status = report.getStatus();
|
||||||
this.id = MRApps.toString(task.getID());
|
this.id = MRApps.toString(task.getID());
|
||||||
this.taskNum = task.getID().getId();
|
this.taskNum = task.getID().getId();
|
||||||
this.successful = getSuccessfulAttempt(task);
|
this.successful = getSuccessfulAttempt(task);
|
||||||
|
@ -121,4 +123,7 @@ public class TaskInfo {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,22 +174,37 @@ public class MockJobs extends MockApps {
|
||||||
report.setFinishTime(System.currentTimeMillis()
|
report.setFinishTime(System.currentTimeMillis()
|
||||||
+ (int) (Math.random() * DT) + 1);
|
+ (int) (Math.random() * DT) + 1);
|
||||||
report.setProgress((float) Math.random());
|
report.setProgress((float) Math.random());
|
||||||
|
report.setStatus("Moving average: " + Math.random());
|
||||||
report.setCounters(TypeConverter.toYarn(newCounters()));
|
report.setCounters(TypeConverter.toYarn(newCounters()));
|
||||||
report.setTaskState(TASK_STATES.next());
|
report.setTaskState(TASK_STATES.next());
|
||||||
return report;
|
return report;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
|
public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||||
|
id.getTaskId().getJobId().getAppId(), 0);
|
||||||
|
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
|
||||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||||
report.setTaskAttemptId(id);
|
report.setTaskAttemptId(id);
|
||||||
report
|
report
|
||||||
.setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
|
.setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
|
||||||
report.setFinishTime(System.currentTimeMillis()
|
report.setFinishTime(System.currentTimeMillis()
|
||||||
+ (int) (Math.random() * DT) + 1);
|
+ (int) (Math.random() * DT) + 1);
|
||||||
|
|
||||||
|
if (id.getTaskId().getTaskType() == TaskType.REDUCE) {
|
||||||
|
report.setShuffleFinishTime(
|
||||||
|
(report.getFinishTime() + report.getStartTime()) / 2);
|
||||||
|
report.setSortFinishTime(
|
||||||
|
(report.getFinishTime() + report.getShuffleFinishTime()) / 2);
|
||||||
|
}
|
||||||
|
|
||||||
report.setPhase(PHASES.next());
|
report.setPhase(PHASES.next());
|
||||||
report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
|
report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
|
||||||
report.setProgress((float) Math.random());
|
report.setProgress((float) Math.random());
|
||||||
report.setCounters(TypeConverter.toYarn(newCounters()));
|
report.setCounters(TypeConverter.toYarn(newCounters()));
|
||||||
|
report.setContainerId(containerId);
|
||||||
|
report.setDiagnosticInfo(DIAGS.next());
|
||||||
|
report.setStateString("Moving average " + Math.random());
|
||||||
return report;
|
return report;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,8 +245,6 @@ public class MockJobs extends MockApps {
|
||||||
taid.setTaskId(tid);
|
taid.setTaskId(tid);
|
||||||
taid.setId(i);
|
taid.setId(i);
|
||||||
final TaskAttemptReport report = newTaskAttemptReport(taid);
|
final TaskAttemptReport report = newTaskAttemptReport(taid);
|
||||||
final List<String> diags = Lists.newArrayList();
|
|
||||||
diags.add(DIAGS.next());
|
|
||||||
return new TaskAttempt() {
|
return new TaskAttempt() {
|
||||||
@Override
|
@Override
|
||||||
public NodeId getNodeId() throws UnsupportedOperationException{
|
public NodeId getNodeId() throws UnsupportedOperationException{
|
||||||
|
@ -250,12 +263,12 @@ public class MockJobs extends MockApps {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getLaunchTime() {
|
public long getLaunchTime() {
|
||||||
return 0;
|
return report.getStartTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getFinishTime() {
|
public long getFinishTime() {
|
||||||
return 0;
|
return report.getFinishTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -313,7 +326,7 @@ public class MockJobs extends MockApps {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> getDiagnostics() {
|
public List<String> getDiagnostics() {
|
||||||
return diags;
|
return Lists.newArrayList(report.getDiagnosticInfo());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -323,12 +336,12 @@ public class MockJobs extends MockApps {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getShuffleFinishTime() {
|
public long getShuffleFinishTime() {
|
||||||
return 0;
|
return report.getShuffleFinishTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getSortFinishTime() {
|
public long getSortFinishTime() {
|
||||||
return 0;
|
return report.getSortFinishTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
/**
|
||||||
/**
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -425,9 +426,9 @@ public class TestAMWebServicesAttempts extends JerseyTest {
|
||||||
public void verifyAMTaskAttempt(JSONObject info, TaskAttempt att,
|
public void verifyAMTaskAttempt(JSONObject info, TaskAttempt att,
|
||||||
TaskType ttype) throws JSONException {
|
TaskType ttype) throws JSONException {
|
||||||
if (ttype == TaskType.REDUCE) {
|
if (ttype == TaskType.REDUCE) {
|
||||||
assertEquals("incorrect number of elements", 16, info.length());
|
assertEquals("incorrect number of elements", 17, info.length());
|
||||||
} else {
|
} else {
|
||||||
assertEquals("incorrect number of elements", 11, info.length());
|
assertEquals("incorrect number of elements", 12, info.length());
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
|
verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
|
||||||
|
@ -532,11 +533,11 @@ public class TestAMWebServicesAttempts extends JerseyTest {
|
||||||
assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
|
assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
|
||||||
mergeFinishTime);
|
mergeFinishTime);
|
||||||
assertEquals("elapsedShuffleTime wrong",
|
assertEquals("elapsedShuffleTime wrong",
|
||||||
ta.getLaunchTime() - ta.getShuffleFinishTime(), elapsedShuffleTime);
|
ta.getShuffleFinishTime() - ta.getLaunchTime(), elapsedShuffleTime);
|
||||||
assertEquals("elapsedMergeTime wrong",
|
assertEquals("elapsedMergeTime wrong",
|
||||||
ta.getShuffleFinishTime() - ta.getSortFinishTime(), elapsedMergeTime);
|
ta.getSortFinishTime() - ta.getShuffleFinishTime(), elapsedMergeTime);
|
||||||
assertEquals("elapsedReduceTime wrong",
|
assertEquals("elapsedReduceTime wrong",
|
||||||
ta.getSortFinishTime() - ta.getFinishTime(), elapsedReduceTime);
|
ta.getFinishTime() - ta.getSortFinishTime(), elapsedReduceTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -525,12 +525,13 @@ public class TestAMWebServicesTasks extends JerseyTest {
|
||||||
|
|
||||||
public void verifyAMSingleTask(JSONObject info, Task task)
|
public void verifyAMSingleTask(JSONObject info, Task task)
|
||||||
throws JSONException {
|
throws JSONException {
|
||||||
assertEquals("incorrect number of elements", 8, info.length());
|
assertEquals("incorrect number of elements", 9, info.length());
|
||||||
|
|
||||||
verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
|
verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
|
||||||
info.getString("type"), info.getString("successfulAttempt"),
|
info.getString("type"), info.getString("successfulAttempt"),
|
||||||
info.getLong("startTime"), info.getLong("finishTime"),
|
info.getLong("startTime"), info.getLong("finishTime"),
|
||||||
info.getLong("elapsedTime"), (float) info.getDouble("progress"));
|
info.getLong("elapsedTime"), (float) info.getDouble("progress"),
|
||||||
|
info.getString("status"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void verifyAMTask(JSONArray arr, Job job, String type)
|
public void verifyAMTask(JSONArray arr, Job job, String type)
|
||||||
|
@ -555,7 +556,7 @@ public class TestAMWebServicesTasks extends JerseyTest {
|
||||||
|
|
||||||
public void verifyTaskGeneric(Task task, String id, String state,
|
public void verifyTaskGeneric(Task task, String id, String state,
|
||||||
String type, String successfulAttempt, long startTime, long finishTime,
|
String type, String successfulAttempt, long startTime, long finishTime,
|
||||||
long elapsedTime, float progress) {
|
long elapsedTime, float progress, String status) {
|
||||||
|
|
||||||
TaskId taskid = task.getID();
|
TaskId taskid = task.getID();
|
||||||
String tid = MRApps.toString(taskid);
|
String tid = MRApps.toString(taskid);
|
||||||
|
@ -572,6 +573,7 @@ public class TestAMWebServicesTasks extends JerseyTest {
|
||||||
assertEquals("finishTime wrong", report.getFinishTime(), finishTime);
|
assertEquals("finishTime wrong", report.getFinishTime(), finishTime);
|
||||||
assertEquals("elapsedTime wrong", finishTime - startTime, elapsedTime);
|
assertEquals("elapsedTime wrong", finishTime - startTime, elapsedTime);
|
||||||
assertEquals("progress wrong", report.getProgress() * 100, progress, 1e-3f);
|
assertEquals("progress wrong", report.getProgress() * 100, progress, 1e-3f);
|
||||||
|
assertEquals("status wrong", report.getStatus(), status);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void verifyAMSingleTaskXML(Element element, Task task) {
|
public void verifyAMSingleTaskXML(Element element, Task task) {
|
||||||
|
@ -582,7 +584,8 @@ public class TestAMWebServicesTasks extends JerseyTest {
|
||||||
WebServicesTestUtils.getXmlLong(element, "startTime"),
|
WebServicesTestUtils.getXmlLong(element, "startTime"),
|
||||||
WebServicesTestUtils.getXmlLong(element, "finishTime"),
|
WebServicesTestUtils.getXmlLong(element, "finishTime"),
|
||||||
WebServicesTestUtils.getXmlLong(element, "elapsedTime"),
|
WebServicesTestUtils.getXmlLong(element, "elapsedTime"),
|
||||||
WebServicesTestUtils.getXmlFloat(element, "progress"));
|
WebServicesTestUtils.getXmlFloat(element, "progress"),
|
||||||
|
WebServicesTestUtils.getXmlString(element, "status"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void verifyAMTaskXML(NodeList nodes, Job job) {
|
public void verifyAMTaskXML(NodeList nodes, Job job) {
|
||||||
|
|
|
@ -24,10 +24,10 @@ public interface TaskReport {
|
||||||
public abstract TaskId getTaskId();
|
public abstract TaskId getTaskId();
|
||||||
public abstract TaskState getTaskState();
|
public abstract TaskState getTaskState();
|
||||||
public abstract float getProgress();
|
public abstract float getProgress();
|
||||||
|
public abstract String getStatus();
|
||||||
public abstract long getStartTime();
|
public abstract long getStartTime();
|
||||||
public abstract long getFinishTime();
|
public abstract long getFinishTime();
|
||||||
public abstract Counters getCounters();
|
public abstract Counters getCounters();
|
||||||
|
|
||||||
public abstract List<TaskAttemptId> getRunningAttemptsList();
|
public abstract List<TaskAttemptId> getRunningAttemptsList();
|
||||||
public abstract TaskAttemptId getRunningAttempt(int index);
|
public abstract TaskAttemptId getRunningAttempt(int index);
|
||||||
public abstract int getRunningAttemptsCount();
|
public abstract int getRunningAttemptsCount();
|
||||||
|
@ -42,6 +42,7 @@ public interface TaskReport {
|
||||||
public abstract void setTaskId(TaskId taskId);
|
public abstract void setTaskId(TaskId taskId);
|
||||||
public abstract void setTaskState(TaskState taskState);
|
public abstract void setTaskState(TaskState taskState);
|
||||||
public abstract void setProgress(float progress);
|
public abstract void setProgress(float progress);
|
||||||
|
public abstract void setStatus(String status);
|
||||||
public abstract void setStartTime(long startTime);
|
public abstract void setStartTime(long startTime);
|
||||||
public abstract void setFinishTime(long finishTime);
|
public abstract void setFinishTime(long finishTime);
|
||||||
public abstract void setCounters(Counters counters);
|
public abstract void setCounters(Counters counters);
|
||||||
|
|
|
@ -49,6 +49,7 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
|
||||||
private List<TaskAttemptId> runningAttempts = null;
|
private List<TaskAttemptId> runningAttempts = null;
|
||||||
private TaskAttemptId successfulAttemptId = null;
|
private TaskAttemptId successfulAttemptId = null;
|
||||||
private List<String> diagnostics = null;
|
private List<String> diagnostics = null;
|
||||||
|
private String status;
|
||||||
|
|
||||||
|
|
||||||
public TaskReportPBImpl() {
|
public TaskReportPBImpl() {
|
||||||
|
@ -171,11 +172,22 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
|
||||||
return (p.getProgress());
|
return (p.getProgress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setProgress(float progress) {
|
public void setProgress(float progress) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setProgress((progress));
|
builder.setProgress((progress));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setStatus(String status) {
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskState getTaskState() {
|
public TaskState getTaskState() {
|
||||||
TaskReportProtoOrBuilder p = viaProto ? proto : builder;
|
TaskReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
|
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
|
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
|
||||||
import org.apache.hadoop.yarn.util.Times;
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
|
@ -89,6 +90,7 @@ public class HsTaskPage extends HsView {
|
||||||
headRow.
|
headRow.
|
||||||
th(".id", "Attempt").
|
th(".id", "Attempt").
|
||||||
th(".state", "State").
|
th(".state", "State").
|
||||||
|
th(".status", "Status").
|
||||||
th(".node", "Node").
|
th(".node", "Node").
|
||||||
th(".logs", "Logs").
|
th(".logs", "Logs").
|
||||||
th(".tsh", "Start Time");
|
th(".tsh", "Start Time");
|
||||||
|
@ -113,15 +115,16 @@ public class HsTaskPage extends HsView {
|
||||||
// DataTables to display
|
// DataTables to display
|
||||||
StringBuilder attemptsTableData = new StringBuilder("[\n");
|
StringBuilder attemptsTableData = new StringBuilder("[\n");
|
||||||
|
|
||||||
for (TaskAttempt ta : getTaskAttempts()) {
|
for (TaskAttempt attempt : getTaskAttempts()) {
|
||||||
String taid = MRApps.toString(ta.getID());
|
final TaskAttemptInfo ta = new TaskAttemptInfo(attempt, false);
|
||||||
|
String taid = ta.getId();
|
||||||
|
|
||||||
String nodeHttpAddr = ta.getNodeHttpAddress();
|
String nodeHttpAddr = ta.getNode();
|
||||||
String containerIdString = ta.getAssignedContainerID().toString();
|
String containerIdString = ta.getAssignedContainerIdStr();
|
||||||
String nodeIdString = ta.getAssignedContainerMgrAddress();
|
String nodeIdString = attempt.getAssignedContainerMgrAddress();
|
||||||
String nodeRackName = ta.getNodeRackName();
|
String nodeRackName = ta.getRack();
|
||||||
|
|
||||||
long attemptStartTime = ta.getLaunchTime();
|
long attemptStartTime = ta.getStartTime();
|
||||||
long shuffleFinishTime = -1;
|
long shuffleFinishTime = -1;
|
||||||
long sortFinishTime = -1;
|
long sortFinishTime = -1;
|
||||||
long attemptFinishTime = ta.getFinishTime();
|
long attemptFinishTime = ta.getFinishTime();
|
||||||
|
@ -129,8 +132,8 @@ public class HsTaskPage extends HsView {
|
||||||
long elapsedSortTime = -1;
|
long elapsedSortTime = -1;
|
||||||
long elapsedReduceTime = -1;
|
long elapsedReduceTime = -1;
|
||||||
if(type == TaskType.REDUCE) {
|
if(type == TaskType.REDUCE) {
|
||||||
shuffleFinishTime = ta.getShuffleFinishTime();
|
shuffleFinishTime = attempt.getShuffleFinishTime();
|
||||||
sortFinishTime = ta.getSortFinishTime();
|
sortFinishTime = attempt.getSortFinishTime();
|
||||||
elapsedShuffleTime =
|
elapsedShuffleTime =
|
||||||
Times.elapsed(attemptStartTime, shuffleFinishTime, false);
|
Times.elapsed(attemptStartTime, shuffleFinishTime, false);
|
||||||
elapsedSortTime =
|
elapsedSortTime =
|
||||||
|
@ -140,11 +143,13 @@ public class HsTaskPage extends HsView {
|
||||||
}
|
}
|
||||||
long attemptElapsed =
|
long attemptElapsed =
|
||||||
Times.elapsed(attemptStartTime, attemptFinishTime, false);
|
Times.elapsed(attemptStartTime, attemptFinishTime, false);
|
||||||
int sortId = ta.getID().getId() + (ta.getID().getTaskId().getId() * 10000);
|
int sortId = attempt.getID().getId()
|
||||||
|
+ (attempt.getID().getTaskId().getId() * 10000);
|
||||||
|
|
||||||
attemptsTableData.append("[\"")
|
attemptsTableData.append("[\"")
|
||||||
.append(sortId + " ").append(taid).append("\",\"")
|
.append(sortId + " ").append(taid).append("\",\"")
|
||||||
.append(ta.getState().toString()).append("\",\"")
|
.append(ta.getState()).append("\",\"")
|
||||||
|
.append(ta.getStatus()).append("\",\"")
|
||||||
|
|
||||||
.append("<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>")
|
.append("<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>")
|
||||||
.append(nodeRackName + "/" + nodeHttpAddr + "</a>\",\"")
|
.append(nodeRackName + "/" + nodeHttpAddr + "</a>\",\"")
|
||||||
|
@ -167,8 +172,9 @@ public class HsTaskPage extends HsView {
|
||||||
.append(elapsedReduceTime).append("\",\"");
|
.append(elapsedReduceTime).append("\",\"");
|
||||||
}
|
}
|
||||||
attemptsTableData.append(attemptElapsed).append("\",\"")
|
attemptsTableData.append(attemptElapsed).append("\",\"")
|
||||||
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
|
.append(StringEscapeUtils.escapeJavaScript(
|
||||||
Joiner.on('\n').join(ta.getDiagnostics())))).append("\"],\n");
|
StringEscapeUtils.escapeHtml(ta.getNote())))
|
||||||
|
.append("\"],\n");
|
||||||
}
|
}
|
||||||
//Remove the last comma and close off the array of arrays
|
//Remove the last comma and close off the array of arrays
|
||||||
if(attemptsTableData.charAt(attemptsTableData.length() - 2) == ',') {
|
if(attemptsTableData.charAt(attemptsTableData.length() - 2) == ',') {
|
||||||
|
@ -184,6 +190,8 @@ public class HsTaskPage extends HsView {
|
||||||
$name("attempt_name").$value("Attempt")._()._().
|
$name("attempt_name").$value("Attempt")._()._().
|
||||||
th().input("search_init").$type(InputType.text).
|
th().input("search_init").$type(InputType.text).
|
||||||
$name("attempt_state").$value("State")._()._().
|
$name("attempt_state").$value("State")._()._().
|
||||||
|
th().input("search_init").$type(InputType.text).
|
||||||
|
$name("attempt_status").$value("Status")._()._().
|
||||||
th().input("search_init").$type(InputType.text).
|
th().input("search_init").$type(InputType.text).
|
||||||
$name("attempt_node").$value("Node")._()._().
|
$name("attempt_node").$value("Node")._()._().
|
||||||
th().input("search_init").$type(InputType.text).
|
th().input("search_init").$type(InputType.text).
|
||||||
|
@ -283,19 +291,19 @@ public class HsTaskPage extends HsView {
|
||||||
.append("\n,aoColumnDefs:[\n")
|
.append("\n,aoColumnDefs:[\n")
|
||||||
|
|
||||||
//logs column should not filterable (it includes container ID which may pollute searches)
|
//logs column should not filterable (it includes container ID which may pollute searches)
|
||||||
.append("\n{'aTargets': [ 3 ]")
|
.append("\n{'aTargets': [ 4 ]")
|
||||||
.append(", 'bSearchable': false }")
|
.append(", 'bSearchable': false }")
|
||||||
|
|
||||||
.append("\n, {'sType':'numeric', 'aTargets': [ 0 ]")
|
.append("\n, {'sType':'numeric', 'aTargets': [ 0 ]")
|
||||||
.append(", 'mRender': parseHadoopAttemptID }")
|
.append(", 'mRender': parseHadoopAttemptID }")
|
||||||
|
|
||||||
.append("\n, {'sType':'numeric', 'aTargets': [ 4, 5")
|
.append("\n, {'sType':'numeric', 'aTargets': [ 5, 6")
|
||||||
//Column numbers are different for maps and reduces
|
//Column numbers are different for maps and reduces
|
||||||
.append(type == TaskType.REDUCE ? ", 6, 7" : "")
|
.append(type == TaskType.REDUCE ? ", 7, 8" : "")
|
||||||
.append(" ], 'mRender': renderHadoopDate }")
|
.append(" ], 'mRender': renderHadoopDate }")
|
||||||
|
|
||||||
.append("\n, {'sType':'numeric', 'aTargets': [")
|
.append("\n, {'sType':'numeric', 'aTargets': [")
|
||||||
.append(type == TaskType.REDUCE ? "8, 9, 10, 11" : "6")
|
.append(type == TaskType.REDUCE ? "9, 10, 11, 12" : "7")
|
||||||
.append(" ], 'mRender': renderHadoopElapsedTime }]")
|
.append(" ], 'mRender': renderHadoopElapsedTime }]")
|
||||||
|
|
||||||
// Sort by id upon page load
|
// Sort by id upon page load
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
|
||||||
|
@ -138,11 +139,31 @@ public class TestBlocks {
|
||||||
when(attempt.getAssignedContainerMgrAddress()).thenReturn(
|
when(attempt.getAssignedContainerMgrAddress()).thenReturn(
|
||||||
"assignedContainerMgrAddress");
|
"assignedContainerMgrAddress");
|
||||||
when(attempt.getNodeRackName()).thenReturn("nodeRackName");
|
when(attempt.getNodeRackName()).thenReturn("nodeRackName");
|
||||||
when(attempt.getLaunchTime()).thenReturn(100002L);
|
|
||||||
when(attempt.getFinishTime()).thenReturn(100012L);
|
final long taStartTime = 100002L;
|
||||||
when(attempt.getShuffleFinishTime()).thenReturn(100010L);
|
final long taFinishTime = 100012L;
|
||||||
when(attempt.getSortFinishTime()).thenReturn(100011L);
|
final long taShuffleFinishTime = 100010L;
|
||||||
when(attempt.getState()).thenReturn(TaskAttemptState.SUCCEEDED);
|
final long taSortFinishTime = 100011L;
|
||||||
|
final TaskAttemptState taState = TaskAttemptState.SUCCEEDED;
|
||||||
|
|
||||||
|
when(attempt.getLaunchTime()).thenReturn(taStartTime);
|
||||||
|
when(attempt.getFinishTime()).thenReturn(taFinishTime);
|
||||||
|
when(attempt.getShuffleFinishTime()).thenReturn(taShuffleFinishTime);
|
||||||
|
when(attempt.getSortFinishTime()).thenReturn(taSortFinishTime);
|
||||||
|
when(attempt.getState()).thenReturn(taState);
|
||||||
|
|
||||||
|
TaskAttemptReport taReport = mock(TaskAttemptReport.class);
|
||||||
|
when(taReport.getStartTime()).thenReturn(taStartTime);
|
||||||
|
when(taReport.getFinishTime()).thenReturn(taFinishTime);
|
||||||
|
when(taReport.getShuffleFinishTime()).thenReturn(taShuffleFinishTime);
|
||||||
|
when(taReport.getSortFinishTime()).thenReturn(taSortFinishTime);
|
||||||
|
when(taReport.getContainerId()).thenReturn(containerId);
|
||||||
|
when(taReport.getProgress()).thenReturn(1.0f);
|
||||||
|
when(taReport.getStateString()).thenReturn("Processed 128/128 records");
|
||||||
|
when(taReport.getTaskAttemptState()).thenReturn(taState);
|
||||||
|
when(taReport.getDiagnosticInfo()).thenReturn("");
|
||||||
|
|
||||||
|
when(attempt.getReport()).thenReturn(taReport);
|
||||||
|
|
||||||
attempts.put(taId, attempt);
|
attempts.put(taId, attempt);
|
||||||
when(task.getAttempts()).thenReturn(attempts);
|
when(task.getAttempts()).thenReturn(attempts);
|
||||||
|
|
|
@ -444,9 +444,9 @@ public class TestHsWebServicesAttempts extends JerseyTest {
|
||||||
public void verifyHsTaskAttempt(JSONObject info, TaskAttempt att,
|
public void verifyHsTaskAttempt(JSONObject info, TaskAttempt att,
|
||||||
TaskType ttype) throws JSONException {
|
TaskType ttype) throws JSONException {
|
||||||
if (ttype == TaskType.REDUCE) {
|
if (ttype == TaskType.REDUCE) {
|
||||||
assertEquals("incorrect number of elements", 16, info.length());
|
assertEquals("incorrect number of elements", 17, info.length());
|
||||||
} else {
|
} else {
|
||||||
assertEquals("incorrect number of elements", 11, info.length());
|
assertEquals("incorrect number of elements", 12, info.length());
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
|
verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
|
||||||
|
@ -551,11 +551,11 @@ public class TestHsWebServicesAttempts extends JerseyTest {
|
||||||
assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
|
assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
|
||||||
mergeFinishTime);
|
mergeFinishTime);
|
||||||
assertEquals("elapsedShuffleTime wrong",
|
assertEquals("elapsedShuffleTime wrong",
|
||||||
ta.getLaunchTime() - ta.getShuffleFinishTime(), elapsedShuffleTime);
|
ta.getShuffleFinishTime() - ta.getLaunchTime(), elapsedShuffleTime);
|
||||||
assertEquals("elapsedMergeTime wrong",
|
assertEquals("elapsedMergeTime wrong",
|
||||||
ta.getShuffleFinishTime() - ta.getSortFinishTime(), elapsedMergeTime);
|
ta.getSortFinishTime() - ta.getShuffleFinishTime(), elapsedMergeTime);
|
||||||
assertEquals("elapsedReduceTime wrong",
|
assertEquals("elapsedReduceTime wrong",
|
||||||
ta.getSortFinishTime() - ta.getFinishTime(), elapsedReduceTime);
|
ta.getFinishTime() - ta.getSortFinishTime(), elapsedReduceTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -538,7 +538,7 @@ public class TestHsWebServicesTasks extends JerseyTest {
|
||||||
|
|
||||||
public void verifyHsSingleTask(JSONObject info, Task task)
|
public void verifyHsSingleTask(JSONObject info, Task task)
|
||||||
throws JSONException {
|
throws JSONException {
|
||||||
assertEquals("incorrect number of elements", 8, info.length());
|
assertEquals("incorrect number of elements", 9, info.length());
|
||||||
|
|
||||||
verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
|
verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
|
||||||
info.getString("type"), info.getString("successfulAttempt"),
|
info.getString("type"), info.getString("successfulAttempt"),
|
||||||
|
|
Loading…
Reference in New Issue