From cf5dd3293278c21747222b1ea2ef37a276565fa5 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Wed, 20 Jun 2012 21:22:33 +0000 Subject: [PATCH] MAPREDUCE-3889. job client tries to use /tasklog interface, but that doesn't exist anymore (Devaraj K via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1352330 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../java/org/apache/hadoop/mapreduce/Job.java | 81 +------------------ 2 files changed, 4 insertions(+), 80 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1f332e2b786..48c08b3efb6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -595,6 +595,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4270. Move the data_join test classes to the correct path. (Thomas Graves via sseth) + MAPREDUCE-3889. job client tries to use /tasklog interface, but that + doesn't exist anymore (Devaraj K via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 51bac982285..2fd666e8272 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -18,30 +18,19 @@ package org.apache.hadoop.mapreduce; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.net.URL; -import java.net.URLConnection; import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.filecache.DistributedCache; @@ -1367,14 +1356,6 @@ private void printTaskEvents(TaskCompletionEvent[] events, Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, IntegerRanges reduceRanges) throws IOException, InterruptedException { for (TaskCompletionEvent event : events) { - TaskCompletionEvent.Status status = event.getStatus(); - if (profiling && shouldDownloadProfile() && - (status == TaskCompletionEvent.Status.SUCCEEDED || - status == TaskCompletionEvent.Status.FAILED) && - (event.isMapTask() ? mapRanges : reduceRanges). - isIncluded(event.idWithinJob())) { - downloadProfile(event); - } switch (filter) { case NONE: break; @@ -1382,7 +1363,6 @@ private void printTaskEvents(TaskCompletionEvent[] events, if (event.getStatus() == TaskCompletionEvent.Status.SUCCEEDED) { LOG.info(event.toString()); - displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp()); } break; case FAILED: @@ -1397,8 +1377,6 @@ private void printTaskEvents(TaskCompletionEvent[] events, System.err.println(diagnostics); } } - // Displaying the task logs - displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp()); } break; case KILLED: @@ -1408,67 +1386,10 @@ private void printTaskEvents(TaskCompletionEvent[] events, break; case ALL: LOG.info(event.toString()); - displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp()); break; } } } - - private void downloadProfile(TaskCompletionEvent e) throws IOException { - URLConnection connection = new URL( - getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + - "&filter=profile").openConnection(); - InputStream in = connection.getInputStream(); - OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile"); - IOUtils.copyBytes(in, out, 64 * 1024, true); - } - - private void displayTaskLogs(TaskAttemptID taskId, String baseUrl) - throws IOException { - // The tasktracker for a 'failed/killed' job might not be around... - if (baseUrl != null) { - // Construct the url for the tasklogs - String taskLogUrl = getTaskLogURL(taskId, baseUrl); - - // Copy tasks's stdout of the JobClient - getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out); - - // Copy task's stderr to stderr of the JobClient - getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err); - } - } - - private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, - OutputStream out) { - try { - int tasklogtimeout = cluster.getConf().getInt( - TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT); - URLConnection connection = taskLogUrl.openConnection(); - connection.setReadTimeout(tasklogtimeout); - connection.setConnectTimeout(tasklogtimeout); - BufferedReader input = - new BufferedReader(new InputStreamReader(connection.getInputStream())); - BufferedWriter output = - new BufferedWriter(new OutputStreamWriter(out)); - try { - String logData = null; - while ((logData = input.readLine()) != null) { - if (logData.length() > 0) { - output.write(taskId + ": " + logData + "\n"); - output.flush(); - } - } - } finally { - input.close(); - } - } catch(IOException ioe) { - LOG.warn("Error reading task output " + ioe.getMessage()); - } - } - - private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { - return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); - } /** The interval at which monitorAndPrintJob() prints status */ public static int getProgressPollInterval(Configuration conf) {