MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279 merge.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1157249 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-08-12 21:00:17 +00:00
parent 504b801ca0
commit ded6f225a5
25 changed files with 416 additions and 123 deletions

View File

@ -388,6 +388,10 @@ Trunk (unreleased changes)
MAPREDUCE-2805. Update RAID for HDFS-2241. (szetszwo) MAPREDUCE-2805. Update RAID for HDFS-2241. (szetszwo)
MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279
merge. (acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -61,19 +61,32 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
private static List<InputSplit> lastResult = null; private static List<InputSplit> lastResult = null;
static class TeraFileSplit extends FileSplit { static class TeraFileSplit extends FileSplit {
static private String[] ZERO_LOCATIONS = new String[0];
private String[] locations; private String[] locations;
public TeraFileSplit() {}
public TeraFileSplit() {
locations = ZERO_LOCATIONS;
}
public TeraFileSplit(Path file, long start, long length, String[] hosts) { public TeraFileSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts); super(file, start, length, hosts);
locations = hosts; try {
locations = super.getLocations();
} catch (IOException e) {
locations = ZERO_LOCATIONS;
} }
}
// XXXXXX should this also be null-protected?
protected void setLocations(String[] hosts) { protected void setLocations(String[] hosts) {
locations = hosts; locations = hosts;
} }
@Override @Override
public String[] getLocations() { public String[] getLocations() {
return locations; return locations;
} }
public String toString() { public String toString() {
StringBuffer result = new StringBuffer(); StringBuffer result = new StringBuffer();
result.append(getPath()); result.append(getPath());

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
* QueueManager for queue operations. * QueueManager for queue operations.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class ACLsManager { public class ACLsManager {
static Log LOG = LogFactory.getLog(ACLsManager.class); static Log LOG = LogFactory.getLog(ACLsManager.class);
// MROwner(user who started this mapreduce cluster)'s ugi // MROwner(user who started this mapreduce cluster)'s ugi
@ -49,7 +49,7 @@ class ACLsManager {
private final boolean aclsEnabled; private final boolean aclsEnabled;
ACLsManager(Configuration conf, JobACLsManager jobACLsManager, public ACLsManager(Configuration conf, JobACLsManager jobACLsManager,
QueueManager queueManager) throws IOException { QueueManager queueManager) throws IOException {
mrOwner = UserGroupInformation.getCurrentUser(); mrOwner = UserGroupInformation.getCurrentUser();
@ -68,7 +68,7 @@ class ACLsManager {
this.queueManager = queueManager; this.queueManager = queueManager;
} }
UserGroupInformation getMROwner() { public UserGroupInformation getMROwner() {
return mrOwner; return mrOwner;
} }
@ -76,7 +76,7 @@ class ACLsManager {
return adminAcl; return adminAcl;
} }
JobACLsManager getJobACLsManager() { public JobACLsManager getJobACLsManager() {
return jobACLsManager; return jobACLsManager;
} }
@ -85,7 +85,7 @@ class ACLsManager {
* i.e. either cluster owner or cluster administrator * i.e. either cluster owner or cluster administrator
* @return true, if user is an admin * @return true, if user is an admin
*/ */
boolean isMRAdmin(UserGroupInformation callerUGI) { public boolean isMRAdmin(UserGroupInformation callerUGI) {
if (adminAcl.isUserAllowed(callerUGI)) { if (adminAcl.isUserAllowed(callerUGI)) {
return true; return true;
} }
@ -111,7 +111,7 @@ class ACLsManager {
* @param operation the operation for which authorization is needed * @param operation the operation for which authorization is needed
* @throws AccessControlException * @throws AccessControlException
*/ */
void checkAccess(JobInProgress job, UserGroupInformation callerUGI, public void checkAccess(JobInProgress job, UserGroupInformation callerUGI,
Operation operation) throws AccessControlException { Operation operation) throws AccessControlException {
String queue = job.getProfile().getQueueName(); String queue = job.getProfile().getQueueName();

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
@InterfaceAudience.Private @InterfaceAudience.Private
class JobACLsManager { public class JobACLsManager {
Configuration conf; Configuration conf;
@ -37,7 +37,7 @@ class JobACLsManager {
this.conf = conf; this.conf = conf;
} }
boolean areACLsEnabled() { public boolean areACLsEnabled() {
return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false); return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
} }
@ -86,7 +86,7 @@ class JobACLsManager {
* @param jobACL * @param jobACL
* @throws AccessControlException * @throws AccessControlException
*/ */
boolean checkAccess(UserGroupInformation callerUGI, public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation, String jobOwner, AccessControlList jobACL) { JobACL jobOperation, String jobOwner, AccessControlList jobACL) {
String user = callerUGI.getShortUserName(); String user = callerUGI.getShortUserName();

View File

@ -240,7 +240,7 @@ public class LocalJobRunner implements ClientProtocol {
getShortUserName()); getShortUserName());
TaskRunner.setupChildMapredLocalDirs(map, localConf); TaskRunner.setupChildMapredLocalDirs(map, localConf);
MapOutputFile mapOutput = new MapOutputFile(); MapOutputFile mapOutput = new MROutputFiles();
mapOutput.setConf(localConf); mapOutput.setConf(localConf);
mapOutputFiles.put(mapId, mapOutput); mapOutputFiles.put(mapId, mapOutput);
@ -404,7 +404,7 @@ public class LocalJobRunner implements ClientProtocol {
if (!this.isInterrupted()) { if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i); TaskAttemptID mapId = mapIds.get(i);
Path mapOut = mapOutputFiles.get(mapId).getOutputFile(); Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
MapOutputFile localOutputFile = new MapOutputFile(); MapOutputFile localOutputFile = new MROutputFiles();
localOutputFile.setConf(localConf); localOutputFile.setConf(localConf);
Path reduceIn = Path reduceIn =
localOutputFile.getInputFileForWrite(mapId.getTaskID(), localOutputFile.getInputFileForWrite(mapId.getTaskID(),

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
/**
* Manipulate the working area for the transient store for maps and reduces.
*
* This class is used by map and reduce tasks to identify the directories that
* they need to write to/read from for intermediate files. The callers of
* these methods are from the Child running the Task.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MROutputFiles extends MapOutputFile {
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator(MRConfig.LOCAL_DIR);
public MROutputFiles() {
}
/**
* Return the path to local map output file created earlier
*
* @return path
* @throws IOException
*/
@Override
public Path getOutputFile()
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, getConf());
}
/**
* Create a local map output file name.
*
* @param size the size of the file
* @return path
* @throws IOException
*/
@Override
public Path getOutputFileForWrite(long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, size, getConf());
}
/**
* Create a local map output file name on the same volume.
*/
@Override
public Path getOutputFileForWriteInVolume(Path existing) {
return new Path(existing.getParent(), MAP_OUTPUT_FILENAME_STRING);
}
/**
* Return the path to a local map output index file created earlier
*
* @return path
* @throws IOException
*/
@Override
public Path getOutputIndexFile()
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
getConf());
}
/**
* Create a local map output index file name.
*
* @param size the size of the file
* @return path
* @throws IOException
*/
@Override
public Path getOutputIndexFileForWrite(long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
size, getConf());
}
/**
* Create a local map output index file name on the same volume.
*/
@Override
public Path getOutputIndexFileForWriteInVolume(Path existing) {
return new Path(existing.getParent(),
MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING);
}
/**
* Return a local map spill file created earlier.
*
* @param spillNumber the number
* @return path
* @throws IOException
*/
@Override
public Path getSpillFile(int spillNumber)
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out", getConf());
}
/**
* Create a local map spill file name.
*
* @param spillNumber the number
* @param size the size of the file
* @return path
* @throws IOException
*/
@Override
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out", size, getConf());
}
/**
* Return a local map spill index file created earlier
*
* @param spillNumber the number
* @return path
* @throws IOException
*/
@Override
public Path getSpillIndexFile(int spillNumber)
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out.index", getConf());
}
/**
* Create a local map spill index file name.
*
* @param spillNumber the number
* @param size the size of the file
* @return path
* @throws IOException
*/
@Override
public Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out.index", size, getConf());
}
/**
* Return a local reduce input file created earlier
*
* @param mapId a map task id
* @return path
* @throws IOException
*/
@Override
public Path getInputFile(int mapId)
throws IOException {
return lDirAlloc.getLocalPathToRead(String.format(
REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
.valueOf(mapId)), getConf());
}
/**
* Create a local reduce input file name.
*
* @param mapId a map task id
* @param size the size of the file
* @return path
* @throws IOException
*/
@Override
public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(String.format(
REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
size, getConf());
}
/** Removes all of the files related to a task. */
@Override
public void removeAll()
throws IOException {
((JobConf)getConf()).deleteLocalFiles(TaskTracker.OUTPUT);
}
@Override
public void setConf(Configuration conf) {
if (!(conf instanceof JobConf)) {
conf = new JobConf(conf);
}
super.setConf(conf);
}
}

View File

@ -23,9 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
/** /**
* Manipulate the working area for the transient store for maps and reduces. * Manipulate the working area for the transient store for maps and reduces.
@ -38,29 +37,24 @@ import org.apache.hadoop.mapreduce.MRConfig;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class MapOutputFile { public abstract class MapOutputFile implements Configurable {
private JobConf conf; private Configuration conf;
static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out"; static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
public MapOutputFile() { public MapOutputFile() {
} }
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator(MRConfig.LOCAL_DIR);
/** /**
* Return the path to local map output file created earlier * Return the path to local map output file created earlier
* *
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getOutputFile() public abstract Path getOutputFile() throws IOException;
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ "file.out", conf);
}
/** /**
* Create a local map output file name. * Create a local map output file name.
@ -69,11 +63,12 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getOutputFileForWrite(long size) public abstract Path getOutputFileForWrite(long size) throws IOException;
throws IOException {
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR /**
+ "file.out", size, conf); * Create a local map output file name on the same volume.
} */
public abstract Path getOutputFileForWriteInVolume(Path existing);
/** /**
* Return the path to a local map output index file created earlier * Return the path to a local map output index file created earlier
@ -81,11 +76,7 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getOutputIndexFile() public abstract Path getOutputIndexFile() throws IOException;
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ "file.out.index", conf);
}
/** /**
* Create a local map output index file name. * Create a local map output index file name.
@ -94,11 +85,12 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getOutputIndexFileForWrite(long size) public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
throws IOException {
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR /**
+ "file.out.index", size, conf); * Create a local map output index file name on the same volume.
} */
public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
/** /**
* Return a local map spill file created earlier. * Return a local map spill file created earlier.
@ -107,11 +99,7 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getSpillFile(int spillNumber) public abstract Path getSpillFile(int spillNumber) throws IOException;
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out", conf);
}
/** /**
* Create a local map spill file name. * Create a local map spill file name.
@ -121,11 +109,8 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getSpillFileForWrite(int spillNumber, long size) public abstract Path getSpillFileForWrite(int spillNumber, long size)
throws IOException { throws IOException;
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out", size, conf);
}
/** /**
* Return a local map spill index file created earlier * Return a local map spill index file created earlier
@ -134,11 +119,7 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getSpillIndexFile(int spillNumber) public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
throws IOException {
return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out.index", conf);
}
/** /**
* Create a local map spill index file name. * Create a local map spill index file name.
@ -148,11 +129,8 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getSpillIndexFileForWrite(int spillNumber, long size) public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException { throws IOException;
return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ spillNumber + ".out.index", size, conf);
}
/** /**
* Return a local reduce input file created earlier * Return a local reduce input file created earlier
@ -161,12 +139,7 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getInputFile(int mapId) public abstract Path getInputFile(int mapId) throws IOException;
throws IOException {
return lDirAlloc.getLocalPathToRead(String.format(
REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
.valueOf(mapId)), conf);
}
/** /**
* Create a local reduce input file name. * Create a local reduce input file name.
@ -176,26 +149,20 @@ public class MapOutputFile {
* @return path * @return path
* @throws IOException * @throws IOException
*/ */
public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, public abstract Path getInputFileForWrite(
long size) org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException;
throws IOException {
return lDirAlloc.getLocalPathForWrite(String.format(
REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
size, conf);
}
/** Removes all of the files related to a task. */ /** Removes all of the files related to a task. */
public void removeAll() public abstract void removeAll() throws IOException;
throws IOException {
conf.deleteLocalFiles(TaskTracker.OUTPUT); @Override
public void setConf(Configuration conf) {
this.conf = conf;
} }
public void setConf(Configuration conf) { @Override
if (conf instanceof JobConf) { public Configuration getConf() {
this.conf = (JobConf) conf; return conf;
} else {
this.conf = new JobConf(conf);
}
} }
} }

View File

@ -1735,13 +1735,13 @@ class MapTask extends Task {
} }
if (numSpills == 1) { //the spill is the final output if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0], rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out")); mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
if (indexCacheList.size() == 0) { if (indexCacheList.size() == 0) {
rfs.rename(mapOutputFile.getSpillIndexFile(0), rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index")); mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
} else { } else {
indexCacheList.get(0).writeToFile( indexCacheList.get(0).writeToFile(
new Path(filename[0].getParent(),"file.out.index"), job); mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
} }
return; return;
} }

View File

@ -362,7 +362,8 @@ public class ReduceTask extends Task {
shuffledMapsCounter, shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter, reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter, mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this); taskStatus, copyPhase, sortPhase, this,
mapOutputFile);
rIter = shuffle.run(); rIter = shuffle.run();
} else { } else {
// local job runner doesn't have a copy phase // local job runner doesn't have a copy phase

View File

@ -146,7 +146,7 @@ abstract public class Task implements Writable, Configurable {
private long initCpuCumulativeTime = 0; private long initCpuCumulativeTime = 0;
protected JobConf conf; protected JobConf conf;
protected MapOutputFile mapOutputFile = new MapOutputFile(); protected MapOutputFile mapOutputFile;
protected LocalDirAllocator lDirAlloc; protected LocalDirAllocator lDirAlloc;
private final static int MAX_RETRIES = 10; private final static int MAX_RETRIES = 10;
protected JobContext jobContext; protected JobContext jobContext;
@ -1150,7 +1150,9 @@ abstract public class Task implements Writable, Configurable {
} else { } else {
this.conf = new JobConf(conf); this.conf = new JobConf(conf);
} }
this.mapOutputFile.setConf(this.conf); this.mapOutputFile = ReflectionUtils.newInstance(
conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
MROutputFiles.class, MapOutputFile.class), conf);
this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
// add the static resolutions (this is required for the junit to // add the static resolutions (this is required for the junit to
// work on testcases that simulate multiple nodes on a single physical // work on testcases that simulate multiple nodes on a single physical

View File

@ -227,8 +227,12 @@ class TaskMemoryManagerThread extends Thread {
continue; // processTree cannot be tracked continue; // processTree cannot be tracked
} }
if (taskTracker.runningTasks.get(tid).wasKilled()) { TaskInProgress tip = taskTracker.getRunningTask(tid);
continue; // this task has been killed already if (tip == null) {
continue;
}
if (tip.wasKilled()) {
continue;
} }
LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = " LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
@ -514,6 +518,12 @@ class TaskMemoryManagerThread extends Thread {
* @param msg diagnostics message * @param msg diagnostics message
*/ */
private void killTask(TaskAttemptID tid, String msg) { private void killTask(TaskAttemptID tid, String msg) {
TaskInProgress tip = taskTracker.getRunningTask(tid);
if (tip != null) {
//for the task identified to be killed update taskDiagnostic
TaskStatus taskStatus = tip.getStatus();
taskStatus.setDiagnosticInfo(msg);
}
// Kill the task and mark it as killed. // Kill the task and mark it as killed.
taskTracker.cleanUpOverMemoryTask(tid, false, msg); taskTracker.cleanUpOverMemoryTask(tid, false, msg);
// Now destroy the ProcessTree, remove it from monitoring map. // Now destroy the ProcessTree, remove it from monitoring map.
@ -530,7 +540,7 @@ class TaskMemoryManagerThread extends Thread {
* @return true if the task can be killed * @return true if the task can be killed
*/ */
private boolean isKillable(TaskAttemptID tid) { private boolean isKillable(TaskAttemptID tid) {
TaskInProgress tip = taskTracker.runningTasks.get(tid); TaskInProgress tip = taskTracker.getRunningTask(tid);
return tip != null && !tip.wasKilled() && return tip != null && !tip.wasKilled() &&
(tip.getRunState() == TaskStatus.State.RUNNING || (tip.getRunState() == TaskStatus.State.RUNNING ||
tip.getRunState() == TaskStatus.State.COMMIT_PENDING); tip.getRunState() == TaskStatus.State.COMMIT_PENDING);

View File

@ -4218,4 +4218,8 @@ public class TaskTracker
ACLsManager getACLsManager() { ACLsManager getACLsManager() {
return aclsManager; return aclsManager;
} }
synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
return runningTasks.get(tid);
}
} }

View File

@ -360,8 +360,11 @@ public class Job extends JobContextImpl implements JobContext {
@Override @Override
public String toString() { public String toString() {
ensureState(JobState.RUNNING); ensureState(JobState.RUNNING);
String reasonforFailure = " ";
try { try {
updateStatus(); updateStatus();
if (status.getState().equals(JobStatus.State.FAILED))
reasonforFailure = getTaskFailureEventString();
} catch (IOException e) { } catch (IOException e) {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
} }
@ -378,10 +381,34 @@ public class Job extends JobContextImpl implements JobContext {
sb.append(status.getState()).append("\n"); sb.append(status.getState()).append("\n");
sb.append("history URL: "); sb.append("history URL: ");
sb.append(status.getHistoryFile()).append("\n"); sb.append(status.getHistoryFile()).append("\n");
sb.append("retired: ").append(status.isRetired()); sb.append("retired: ").append(status.isRetired()).append("\n");
sb.append("reason for failure: ").append(reasonforFailure);
return sb.toString(); return sb.toString();
} }
/**
* @return taskid which caused job failure
* @throws IOException
* @throws InterruptedException
*/
String getTaskFailureEventString() throws IOException,
InterruptedException {
int failCount = 1;
TaskCompletionEvent lastEvent = null;
for (TaskCompletionEvent event : cluster.getClient().getTaskCompletionEvents(
status.getJobID(), 0, 10)) {
if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
failCount++;
lastEvent = event;
}
}
String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
return (" task " + taskID + " failed " +
failCount + " times " + "For details check tasktracker at: " +
lastEvent.getTaskTrackerHttp());
}
/** /**
* Get the information of the current state of the tasks of a job. * Get the information of the current state of the tasks of a job.
* *

View File

@ -59,4 +59,6 @@ public interface MRConfig {
7*24*60*60*1000; // 7 days 7*24*60*60*1000; // 7 days
public static final String FRAMEWORK_NAME = "mapreduce.framework.name"; public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
public static final String TASK_LOCAL_OUTPUT_CLASS =
"mapreduce.task.local.output.class";
} }

View File

@ -262,6 +262,8 @@ public class JobHistoryParser {
taskInfo.finishTime = event.getFinishTime(); taskInfo.finishTime = event.getFinishTime();
taskInfo.error = event.getError(); taskInfo.error = event.getError();
taskInfo.failedDueToAttemptId = event.getFailedAttemptID(); taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
info.errorInfo = "Task " + taskInfo.taskId +" failed " +
taskInfo.attemptsMap.size() + " times ";
} }
private void handleTaskStartedEvent(TaskStartedEvent event) { private void handleTaskStartedEvent(TaskStartedEvent event) {
@ -321,6 +323,7 @@ public class JobHistoryParser {
* The class where job information is aggregated into after parsing * The class where job information is aggregated into after parsing
*/ */
public static class JobInfo { public static class JobInfo {
String errorInfo = "None";
long submitTime; long submitTime;
long finishTime; long finishTime;
JobID jobid; JobID jobid;
@ -406,6 +409,7 @@ public class JobHistoryParser {
public long getFinishedReduces() { return finishedReduces; } public long getFinishedReduces() { return finishedReduces; }
/** Get the job status */ /** Get the job status */
public String getJobStatus() { return jobStatus; } public String getJobStatus() { return jobStatus; }
public String getErrorInfo() { return errorInfo; }
/** Get the counters for the job */ /** Get the counters for the job */
public Counters getTotalCounters() { return totalCounters; } public Counters getTotalCounters() { return totalCounters; }
/** Get the map counters for the job */ /** Get the map counters for the job */

View File

@ -67,7 +67,7 @@ public class TaskFinishedEvent implements HistoryEvent {
/** Get the task finish time */ /** Get the task finish time */
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return datum.finishTime; }
/** Get task counters */ /** Get task counters */
Counters getCounters() { return EventReader.fromAvro(datum.counters); } public Counters getCounters() { return EventReader.fromAvro(datum.counters); }
/** Get task type */ /** Get task type */
public TaskType getTaskType() { public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString()); return TaskType.valueOf(datum.taskType.toString());

View File

@ -34,10 +34,10 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosName;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -92,6 +92,13 @@ public class TokenCache {
} }
} }
static String getJTPrincipal(Configuration conf) throws IOException {
String jtHostname = JobTracker.getAddress(conf).getHostName();
// get jobtracker principal for use as delegation token renewer
return SecurityUtil.getServerPrincipal(conf.get(JTConfig.JT_USER_NAME),
jtHostname);
}
/** /**
* get delegation token for a specific FS * get delegation token for a specific FS
* @param fs * @param fs
@ -102,12 +109,11 @@ public class TokenCache {
*/ */
static void obtainTokensForNamenodesInternal(FileSystem fs, static void obtainTokensForNamenodesInternal(FileSystem fs,
Credentials credentials, Configuration conf) throws IOException { Credentials credentials, Configuration conf) throws IOException {
String delegTokenRenewer = getJTPrincipal(conf);
// get jobtracker principal id (for the renewer) if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
KerberosName jtKrbName = throw new IOException(
new KerberosName(conf.get(JTConfig.JT_USER_NAME,"")); "Can't get JobTracker Kerberos principal for use as renewer");
}
String delegTokenRenewer = jtKrbName.getShortName();
boolean readFile = true; boolean readFile = true;
String fsName = fs.getCanonicalServiceName(); String fsName = fs.getCanonicalServiceName();

View File

@ -133,7 +133,7 @@ public class MergeManager<K, V> {
Counters.Counter reduceCombineInputCounter, Counters.Counter reduceCombineInputCounter,
Counters.Counter mergedMapOutputsCounter, Counters.Counter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter, ExceptionReporter exceptionReporter,
Progress mergePhase) { Progress mergePhase, MapOutputFile mapOutputFile) {
this.reduceId = reduceId; this.reduceId = reduceId;
this.jobConf = jobConf; this.jobConf = jobConf;
this.localDirAllocator = localDirAllocator; this.localDirAllocator = localDirAllocator;
@ -146,7 +146,7 @@ public class MergeManager<K, V> {
this.reduceCombineInputCounter = reduceCombineInputCounter; this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter; this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter; this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.mapOutputFile = new MapOutputFile(); this.mapOutputFile = mapOutputFile;
this.mapOutputFile.setConf(jobConf); this.mapOutputFile.setConf(jobConf);
this.localFS = localFS; this.localFS = localFS;

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
@ -75,7 +76,8 @@ public class Shuffle<K, V> implements ExceptionReporter {
TaskStatus status, TaskStatus status,
Progress copyPhase, Progress copyPhase,
Progress mergePhase, Progress mergePhase,
Task reduceTask) { Task reduceTask,
MapOutputFile mapOutputFile) {
this.reduceId = reduceId; this.reduceId = reduceId;
this.jobConf = jobConf; this.jobConf = jobConf;
this.umbilical = umbilical; this.umbilical = umbilical;
@ -95,7 +97,7 @@ public class Shuffle<K, V> implements ExceptionReporter {
spilledRecordsCounter, spilledRecordsCounter,
reduceCombineInputCounter, reduceCombineInputCounter,
mergedMapOutputsCounter, mergedMapOutputsCounter,
this, mergePhase); this, mergePhase, mapOutputFile);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -48,4 +48,8 @@
<name>mapreduce.jobtracker.persist.jobstatus.active</name> <name>mapreduce.jobtracker.persist.jobstatus.active</name>
<value>false</value> <value>false</value>
</property> </property>
<property>
<name>mapreduce.task.local.output.class</name>
<value>org.apache.hadoop.mapred.MROutputFiles</value>
</property>
</configuration> </configuration>

View File

@ -293,7 +293,7 @@ public class TestMapRed extends Configured implements Tool {
) throws IOException { ) throws IOException {
if (first) { if (first) {
first = false; first = false;
MapOutputFile mapOutputFile = new MapOutputFile(); MapOutputFile mapOutputFile = new MROutputFiles();
mapOutputFile.setConf(conf); mapOutputFile.setConf(conf);
Path input = mapOutputFile.getInputFile(0); Path input = mapOutputFile.getInputFile(0);
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);

View File

@ -178,7 +178,7 @@ public class TestBinaryTokenFile {
jConf = mrCluster.createJobConf(); jConf = mrCluster.createJobConf();
// provide namenodes names for the job to get the delegation tokens for // provide namenodes names for the job to get the delegation tokens for
String nnUri = dfsCluster.getURI().toString(); String nnUri = dfsCluster.getURI(0).toString();
jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
// job tracker principla id.. // job tracker principla id..
jConf.set(JTConfig.JT_USER_NAME, "jt_id"); jConf.set(JTConfig.JT_USER_NAME, "jt_id");

View File

@ -150,6 +150,7 @@ public class TestTokenCache {
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("hadoop.security.auth_to_local", "RULE:[2:$1]");
dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
jConf = new JobConf(conf); jConf = new JobConf(conf);
mrCluster = new MiniMRCluster(0, 0, numSlaves, mrCluster = new MiniMRCluster(0, 0, numSlaves,
@ -224,10 +225,10 @@ public class TestTokenCache {
jConf = mrCluster.createJobConf(); jConf = mrCluster.createJobConf();
// provide namenodes names for the job to get the delegation tokens for // provide namenodes names for the job to get the delegation tokens for
String nnUri = dfsCluster.getURI().toString(); String nnUri = dfsCluster.getURI(0).toString();
jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
// job tracker principla id.. // job tracker principla id..
jConf.set(JTConfig.JT_USER_NAME, "jt_id"); jConf.set(JTConfig.JT_USER_NAME, "jt_id/foo@BAR");
// using argument to pass the file name // using argument to pass the file name
String[] args = { String[] args = {
@ -361,4 +362,20 @@ public class TestTokenCache {
} }
} }
/**
* verify _HOST substitution
* @throws IOException
*/
@Test
public void testGetJTPrincipal() throws IOException {
String serviceName = "jt/";
String hostName = "foo";
String domainName = "@BAR";
Configuration conf = new Configuration();
conf.set(JTConfig.JT_IPC_ADDRESS, hostName + ":8888");
conf.set(JTConfig.JT_USER_NAME, serviceName + SecurityUtil.HOSTNAME_PATTERN
+ domainName);
assertEquals("Failed to substitute HOSTNAME_PATTERN with hostName",
serviceName + hostName + domainName, TokenCache.getJTPrincipal(conf));
}
} }

View File

@ -108,7 +108,7 @@ public class TestMapredGroupMappingServiceRefresh {
cluster = new MiniDFSCluster(0, config, 1, true, true, true, null, null, cluster = new MiniDFSCluster(0, config, 1, true, true, true, null, null,
null, null); null, null);
cluster.waitActive(); cluster.waitActive();
URI uri = cluster.getURI(); URI uri = cluster.getURI(0);
MiniMRCluster miniMRCluster = new MiniMRCluster(0, uri.toString() , MiniMRCluster miniMRCluster = new MiniMRCluster(0, uri.toString() ,
3, null, null, config); 3, null, null, config);

View File

@ -45,6 +45,7 @@
<%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %> <%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %>
<% <%
String logFile = request.getParameter("logFile"); String logFile = request.getParameter("logFile");
String reasonforFailure = " ";
final Path jobFile = new Path(logFile); final Path jobFile = new Path(logFile);
String jobid = JobHistory.getJobIDFromHistoryFilePath(jobFile).toString(); String jobid = JobHistory.getJobIDFromHistoryFilePath(jobFile).toString();
@ -55,6 +56,8 @@
if (job == null) { if (job == null) {
return; return;
} }
if (job.getJobStatus().equals("FAILED"))
reasonforFailure = job.getErrorInfo();
%> %>
<html> <html>
@ -78,6 +81,7 @@
<b>Launched At: </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLaunchTime(), job.getSubmitTime()) %><br/> <b>Launched At: </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLaunchTime(), job.getSubmitTime()) %><br/>
<b>Finished At: </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getFinishTime(), job.getLaunchTime()) %><br/> <b>Finished At: </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getFinishTime(), job.getLaunchTime()) %><br/>
<b>Status: </b> <%= ((job.getJobStatus()) == null ? "Incomplete" :job.getJobStatus()) %><br/> <b>Status: </b> <%= ((job.getJobStatus()) == null ? "Incomplete" :job.getJobStatus()) %><br/>
<b>ReasonForFailure: </b> <%=reasonforFailure %><br/>
<% <%
HistoryViewer.SummarizedJob sj = new HistoryViewer.SummarizedJob(job); HistoryViewer.SummarizedJob sj = new HistoryViewer.SummarizedJob(job);
%> %>