svn merge -c 1352330. FIXES: MAPREDUCE-3889. job client tries to use /tasklog interface, but that doesn't exist anymore (Devaraj K via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1352335 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-06-20 21:32:21 +00:00
parent 3b61e3693f
commit f63255faec
2 changed files with 4 additions and 80 deletions

View File

@ -477,6 +477,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4270. Move the data_join test classes to the correct path. MAPREDUCE-4270. Move the data_join test classes to the correct path.
(Thomas Graves via sseth) (Thomas Graves via sseth)
MAPREDUCE-3889. job client tries to use /tasklog interface, but that
doesn't exist anymore (Devaraj K via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,30 +18,19 @@
package org.apache.hadoop.mapreduce; package org.apache.hadoop.mapreduce;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLConnection;
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@ -1367,14 +1356,6 @@ public class Job extends JobContextImpl implements JobContext {
Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
IntegerRanges reduceRanges) throws IOException, InterruptedException { IntegerRanges reduceRanges) throws IOException, InterruptedException {
for (TaskCompletionEvent event : events) { for (TaskCompletionEvent event : events) {
TaskCompletionEvent.Status status = event.getStatus();
if (profiling && shouldDownloadProfile() &&
(status == TaskCompletionEvent.Status.SUCCEEDED ||
status == TaskCompletionEvent.Status.FAILED) &&
(event.isMapTask() ? mapRanges : reduceRanges).
isIncluded(event.idWithinJob())) {
downloadProfile(event);
}
switch (filter) { switch (filter) {
case NONE: case NONE:
break; break;
@ -1382,7 +1363,6 @@ public class Job extends JobContextImpl implements JobContext {
if (event.getStatus() == if (event.getStatus() ==
TaskCompletionEvent.Status.SUCCEEDED) { TaskCompletionEvent.Status.SUCCEEDED) {
LOG.info(event.toString()); LOG.info(event.toString());
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
} }
break; break;
case FAILED: case FAILED:
@ -1397,8 +1377,6 @@ public class Job extends JobContextImpl implements JobContext {
System.err.println(diagnostics); System.err.println(diagnostics);
} }
} }
// Displaying the task logs
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
} }
break; break;
case KILLED: case KILLED:
@ -1408,67 +1386,10 @@ public class Job extends JobContextImpl implements JobContext {
break; break;
case ALL: case ALL:
LOG.info(event.toString()); LOG.info(event.toString());
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
break; break;
} }
} }
} }
private void downloadProfile(TaskCompletionEvent e) throws IOException {
URLConnection connection = new URL(
getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) +
"&filter=profile").openConnection();
InputStream in = connection.getInputStream();
OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
IOUtils.copyBytes(in, out, 64 * 1024, true);
}
private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
throws IOException {
// The tasktracker for a 'failed/killed' job might not be around...
if (baseUrl != null) {
// Construct the url for the tasklogs
String taskLogUrl = getTaskLogURL(taskId, baseUrl);
// Copy tasks's stdout of the JobClient
getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
// Copy task's stderr to stderr of the JobClient
getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
}
}
private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
OutputStream out) {
try {
int tasklogtimeout = cluster.getConf().getInt(
TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
URLConnection connection = taskLogUrl.openConnection();
connection.setReadTimeout(tasklogtimeout);
connection.setConnectTimeout(tasklogtimeout);
BufferedReader input =
new BufferedReader(new InputStreamReader(connection.getInputStream()));
BufferedWriter output =
new BufferedWriter(new OutputStreamWriter(out));
try {
String logData = null;
while ((logData = input.readLine()) != null) {
if (logData.length() > 0) {
output.write(taskId + ": " + logData + "\n");
output.flush();
}
}
} finally {
input.close();
}
} catch(IOException ioe) {
LOG.warn("Error reading task output " + ioe.getMessage());
}
}
private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
}
/** The interval at which monitorAndPrintJob() prints status */ /** The interval at which monitorAndPrintJob() prints status */
public static int getProgressPollInterval(Configuration conf) { public static int getProgressPollInterval(Configuration conf) {