MAPREDUCE-3889. job client tries to use /tasklog interface, but that doesn't exist anymore (Devaraj K via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1352330 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-06-20 21:22:33 +00:00
parent daa28cc6ce
commit cf5dd32932
2 changed files with 4 additions and 80 deletions

View File

@ -595,6 +595,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4270. Move the data_join test classes to the correct path.
(Thomas Graves via sseth)
MAPREDUCE-3889. job client tries to use /tasklog interface, but that
doesn't exist anymore (Devaraj K via bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -18,30 +18,19 @@
package org.apache.hadoop.mapreduce;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLConnection;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@ -1367,14 +1356,6 @@ public class Job extends JobContextImpl implements JobContext {
Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
IntegerRanges reduceRanges) throws IOException, InterruptedException {
for (TaskCompletionEvent event : events) {
TaskCompletionEvent.Status status = event.getStatus();
if (profiling && shouldDownloadProfile() &&
(status == TaskCompletionEvent.Status.SUCCEEDED ||
status == TaskCompletionEvent.Status.FAILED) &&
(event.isMapTask() ? mapRanges : reduceRanges).
isIncluded(event.idWithinJob())) {
downloadProfile(event);
}
switch (filter) {
case NONE:
break;
@ -1382,7 +1363,6 @@ public class Job extends JobContextImpl implements JobContext {
if (event.getStatus() ==
TaskCompletionEvent.Status.SUCCEEDED) {
LOG.info(event.toString());
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
}
break;
case FAILED:
@ -1397,8 +1377,6 @@ public class Job extends JobContextImpl implements JobContext {
System.err.println(diagnostics);
}
}
// Displaying the task logs
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
}
break;
case KILLED:
@ -1408,67 +1386,10 @@ public class Job extends JobContextImpl implements JobContext {
break;
case ALL:
LOG.info(event.toString());
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
break;
}
}
}
private void downloadProfile(TaskCompletionEvent e) throws IOException {
URLConnection connection = new URL(
getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) +
"&filter=profile").openConnection();
InputStream in = connection.getInputStream();
OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
IOUtils.copyBytes(in, out, 64 * 1024, true);
}
private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
throws IOException {
// The tasktracker for a 'failed/killed' job might not be around...
if (baseUrl != null) {
// Construct the url for the tasklogs
String taskLogUrl = getTaskLogURL(taskId, baseUrl);
// Copy tasks's stdout of the JobClient
getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
// Copy task's stderr to stderr of the JobClient
getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
}
}
private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
OutputStream out) {
try {
int tasklogtimeout = cluster.getConf().getInt(
TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
URLConnection connection = taskLogUrl.openConnection();
connection.setReadTimeout(tasklogtimeout);
connection.setConnectTimeout(tasklogtimeout);
BufferedReader input =
new BufferedReader(new InputStreamReader(connection.getInputStream()));
BufferedWriter output =
new BufferedWriter(new OutputStreamWriter(out));
try {
String logData = null;
while ((logData = input.readLine()) != null) {
if (logData.length() > 0) {
output.write(taskId + ": " + logData + "\n");
output.flush();
}
}
} finally {
input.close();
}
} catch(IOException ioe) {
LOG.warn("Error reading task output " + ioe.getMessage());
}
}
private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
}
/** The interval at which monitorAndPrintJob() prints status */
public static int getProgressPollInterval(Configuration conf) {