From ded6f225a55517deedc2bd502f2b68f1ca2ddee8 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Fri, 12 Aug 2011 21:00:17 +0000 Subject: [PATCH] MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279 merge. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1157249 13f79535-47bb-0310-9956-ffa450edef68 --- mapreduce/CHANGES.txt | 4 + .../examples/terasort/TeraInputFormat.java | 17 +- .../org/apache/hadoop/mapred/ACLsManager.java | 12 +- .../apache/hadoop/mapred/JobACLsManager.java | 6 +- .../apache/hadoop/mapred/LocalJobRunner.java | 4 +- .../apache/hadoop/mapred/MROutputFiles.java | 226 ++++++++++++++++++ .../apache/hadoop/mapred/MapOutputFile.java | 129 ++++------ .../org/apache/hadoop/mapred/MapTask.java | 6 +- .../org/apache/hadoop/mapred/ReduceTask.java | 3 +- .../java/org/apache/hadoop/mapred/Task.java | 6 +- .../mapred/TaskMemoryManagerThread.java | 16 +- .../org/apache/hadoop/mapred/TaskTracker.java | 4 + .../java/org/apache/hadoop/mapreduce/Job.java | 33 ++- .../org/apache/hadoop/mapreduce/MRConfig.java | 2 + .../jobhistory/JobHistoryParser.java | 4 + .../jobhistory/TaskFinishedEvent.java | 2 +- .../hadoop/mapreduce/security/TokenCache.java | 20 +- .../mapreduce/task/reduce/MergeManager.java | 4 +- .../hadoop/mapreduce/task/reduce/Shuffle.java | 6 +- mapreduce/src/test/mapred-site.xml | 4 + .../org/apache/hadoop/mapred/TestMapRed.java | 2 +- .../security/TestBinaryTokenFile.java | 2 +- .../mapreduce/security/TestTokenCache.java | 21 +- .../TestMapredGroupMappingServiceRefresh.java | 2 +- .../src/webapps/job/jobdetailshistory.jsp | 4 + 25 files changed, 416 insertions(+), 123 deletions(-) create mode 100644 mapreduce/src/java/org/apache/hadoop/mapred/MROutputFiles.java diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index 3d3d9b84532..85748bff42a 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -387,6 +387,10 @@ Trunk (unreleased changes) MAPREDUCE-2797. Update mapreduce tests and RAID for HDFS-2239. (szetszwo) MAPREDUCE-2805. Update RAID for HDFS-2241. (szetszwo) + + MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279 + merge. (acmurthy) + Release 0.22.0 - Unreleased diff --git a/mapreduce/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java b/mapreduce/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java index 1c58ddc43a3..a381aba913f 100644 --- a/mapreduce/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java +++ b/mapreduce/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java @@ -61,19 +61,32 @@ public class TeraInputFormat extends FileInputFormat { private static List lastResult = null; static class TeraFileSplit extends FileSplit { + static private String[] ZERO_LOCATIONS = new String[0]; + private String[] locations; - public TeraFileSplit() {} + + public TeraFileSplit() { + locations = ZERO_LOCATIONS; + } public TeraFileSplit(Path file, long start, long length, String[] hosts) { super(file, start, length, hosts); - locations = hosts; + try { + locations = super.getLocations(); + } catch (IOException e) { + locations = ZERO_LOCATIONS; + } } + + // XXXXXX should this also be null-protected? protected void setLocations(String[] hosts) { locations = hosts; } + @Override public String[] getLocations() { return locations; } + public String toString() { StringBuffer result = new StringBuffer(); result.append(getPath()); diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ACLsManager.java b/mapreduce/src/java/org/apache/hadoop/mapred/ACLsManager.java index 32cdc903ca9..a186e982b03 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/ACLsManager.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/ACLsManager.java @@ -36,7 +36,7 @@ * QueueManager for queue operations. */ @InterfaceAudience.Private -class ACLsManager { +public class ACLsManager { static Log LOG = LogFactory.getLog(ACLsManager.class); // MROwner(user who started this mapreduce cluster)'s ugi @@ -49,7 +49,7 @@ class ACLsManager { private final boolean aclsEnabled; - ACLsManager(Configuration conf, JobACLsManager jobACLsManager, + public ACLsManager(Configuration conf, JobACLsManager jobACLsManager, QueueManager queueManager) throws IOException { mrOwner = UserGroupInformation.getCurrentUser(); @@ -68,7 +68,7 @@ class ACLsManager { this.queueManager = queueManager; } - UserGroupInformation getMROwner() { + public UserGroupInformation getMROwner() { return mrOwner; } @@ -76,7 +76,7 @@ AccessControlList getAdminsAcl() { return adminAcl; } - JobACLsManager getJobACLsManager() { + public JobACLsManager getJobACLsManager() { return jobACLsManager; } @@ -85,7 +85,7 @@ JobACLsManager getJobACLsManager() { * i.e. either cluster owner or cluster administrator * @return true, if user is an admin */ - boolean isMRAdmin(UserGroupInformation callerUGI) { + public boolean isMRAdmin(UserGroupInformation callerUGI) { if (adminAcl.isUserAllowed(callerUGI)) { return true; } @@ -111,7 +111,7 @@ boolean isMRAdmin(UserGroupInformation callerUGI) { * @param operation the operation for which authorization is needed * @throws AccessControlException */ - void checkAccess(JobInProgress job, UserGroupInformation callerUGI, + public void checkAccess(JobInProgress job, UserGroupInformation callerUGI, Operation operation) throws AccessControlException { String queue = job.getProfile().getQueueName(); diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobACLsManager.java b/mapreduce/src/java/org/apache/hadoop/mapred/JobACLsManager.java index 9e9ad1ea99e..7a9a5f53cfa 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/JobACLsManager.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/JobACLsManager.java @@ -29,7 +29,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; @InterfaceAudience.Private -class JobACLsManager { +public class JobACLsManager { Configuration conf; @@ -37,7 +37,7 @@ public JobACLsManager(Configuration conf) { this.conf = conf; } - boolean areACLsEnabled() { + public boolean areACLsEnabled() { return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false); } @@ -86,7 +86,7 @@ Map constructJobACLs(Configuration conf) { * @param jobACL * @throws AccessControlException */ - boolean checkAccess(UserGroupInformation callerUGI, + public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation, String jobOwner, AccessControlList jobACL) { String user = callerUGI.getShortUserName(); diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java b/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java index d28faca70dc..4d05e406177 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -240,7 +240,7 @@ public void run() { getShortUserName()); TaskRunner.setupChildMapredLocalDirs(map, localConf); - MapOutputFile mapOutput = new MapOutputFile(); + MapOutputFile mapOutput = new MROutputFiles(); mapOutput.setConf(localConf); mapOutputFiles.put(mapId, mapOutput); @@ -404,7 +404,7 @@ public void run() { if (!this.isInterrupted()) { TaskAttemptID mapId = mapIds.get(i); Path mapOut = mapOutputFiles.get(mapId).getOutputFile(); - MapOutputFile localOutputFile = new MapOutputFile(); + MapOutputFile localOutputFile = new MROutputFiles(); localOutputFile.setConf(localConf); Path reduceIn = localOutputFile.getInputFileForWrite(mapId.getTaskID(), diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/MROutputFiles.java b/mapreduce/src/java/org/apache/hadoop/mapred/MROutputFiles.java new file mode 100644 index 00000000000..3f64a10e307 --- /dev/null +++ b/mapreduce/src/java/org/apache/hadoop/mapred/MROutputFiles.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRConfig; + +/** + * Manipulate the working area for the transient store for maps and reduces. + * + * This class is used by map and reduce tasks to identify the directories that + * they need to write to/read from for intermediate files. The callers of + * these methods are from the Child running the Task. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class MROutputFiles extends MapOutputFile { + + private LocalDirAllocator lDirAlloc = + new LocalDirAllocator(MRConfig.LOCAL_DIR); + + public MROutputFiles() { + } + + /** + * Return the path to local map output file created earlier + * + * @return path + * @throws IOException + */ + @Override + public Path getOutputFile() + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR + + MAP_OUTPUT_FILENAME_STRING, getConf()); + } + + /** + * Create a local map output file name. + * + * @param size the size of the file + * @return path + * @throws IOException + */ + @Override + public Path getOutputFileForWrite(long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR + + MAP_OUTPUT_FILENAME_STRING, size, getConf()); + } + + /** + * Create a local map output file name on the same volume. + */ + @Override + public Path getOutputFileForWriteInVolume(Path existing) { + return new Path(existing.getParent(), MAP_OUTPUT_FILENAME_STRING); + } + + /** + * Return the path to a local map output index file created earlier + * + * @return path + * @throws IOException + */ + @Override + public Path getOutputIndexFile() + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR + + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, + getConf()); + } + + /** + * Create a local map output index file name. + * + * @param size the size of the file + * @return path + * @throws IOException + */ + @Override + public Path getOutputIndexFileForWrite(long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR + + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, + size, getConf()); + } + + /** + * Create a local map output index file name on the same volume. + */ + @Override + public Path getOutputIndexFileForWriteInVolume(Path existing) { + return new Path(existing.getParent(), + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING); + } + + /** + * Return a local map spill file created earlier. + * + * @param spillNumber the number + * @return path + * @throws IOException + */ + @Override + public Path getSpillFile(int spillNumber) + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out", getConf()); + } + + /** + * Create a local map spill file name. + * + * @param spillNumber the number + * @param size the size of the file + * @return path + * @throws IOException + */ + @Override + public Path getSpillFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out", size, getConf()); + } + + /** + * Return a local map spill index file created earlier + * + * @param spillNumber the number + * @return path + * @throws IOException + */ + @Override + public Path getSpillIndexFile(int spillNumber) + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out.index", getConf()); + } + + /** + * Create a local map spill index file name. + * + * @param spillNumber the number + * @param size the size of the file + * @return path + * @throws IOException + */ + @Override + public Path getSpillIndexFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out.index", size, getConf()); + } + + /** + * Return a local reduce input file created earlier + * + * @param mapId a map task id + * @return path + * @throws IOException + */ + @Override + public Path getInputFile(int mapId) + throws IOException { + return lDirAlloc.getLocalPathToRead(String.format( + REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer + .valueOf(mapId)), getConf()); + } + + /** + * Create a local reduce input file name. + * + * @param mapId a map task id + * @param size the size of the file + * @return path + * @throws IOException + */ + @Override + public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, + long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(String.format( + REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()), + size, getConf()); + } + + /** Removes all of the files related to a task. */ + @Override + public void removeAll() + throws IOException { + ((JobConf)getConf()).deleteLocalFiles(TaskTracker.OUTPUT); + } + + @Override + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + conf = new JobConf(conf); + } + super.setConf(conf); + } + +} diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java b/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java index 3d4511b40b9..b5ff73eb1bd 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java @@ -23,9 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.MRConfig; /** * Manipulate the working area for the transient store for maps and reduces. @@ -38,164 +37,132 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class MapOutputFile { +public abstract class MapOutputFile implements Configurable { - private JobConf conf; + private Configuration conf; + static final String MAP_OUTPUT_FILENAME_STRING = "file.out"; + static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index"; static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out"; public MapOutputFile() { } - private LocalDirAllocator lDirAlloc = - new LocalDirAllocator(MRConfig.LOCAL_DIR); - /** * Return the path to local map output file created earlier - * + * * @return path * @throws IOException */ - public Path getOutputFile() - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR - + "file.out", conf); - } + public abstract Path getOutputFile() throws IOException; /** * Create a local map output file name. - * + * * @param size the size of the file * @return path * @throws IOException */ - public Path getOutputFileForWrite(long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR - + "file.out", size, conf); - } + public abstract Path getOutputFileForWrite(long size) throws IOException; + + /** + * Create a local map output file name on the same volume. + */ + public abstract Path getOutputFileForWriteInVolume(Path existing); /** * Return the path to a local map output index file created earlier - * + * * @return path * @throws IOException */ - public Path getOutputIndexFile() - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR - + "file.out.index", conf); - } + public abstract Path getOutputIndexFile() throws IOException; /** * Create a local map output index file name. - * + * * @param size the size of the file * @return path * @throws IOException */ - public Path getOutputIndexFileForWrite(long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR - + "file.out.index", size, conf); - } + public abstract Path getOutputIndexFileForWrite(long size) throws IOException; + + /** + * Create a local map output index file name on the same volume. + */ + public abstract Path getOutputIndexFileForWriteInVolume(Path existing); /** * Return a local map spill file created earlier. - * + * * @param spillNumber the number * @return path * @throws IOException */ - public Path getSpillFile(int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill" - + spillNumber + ".out", conf); - } + public abstract Path getSpillFile(int spillNumber) throws IOException; /** * Create a local map spill file name. - * + * * @param spillNumber the number * @param size the size of the file * @return path * @throws IOException */ - public Path getSpillFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" - + spillNumber + ".out", size, conf); - } + public abstract Path getSpillFileForWrite(int spillNumber, long size) + throws IOException; /** * Return a local map spill index file created earlier - * + * * @param spillNumber the number * @return path * @throws IOException */ - public Path getSpillIndexFile(int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill" - + spillNumber + ".out.index", conf); - } + public abstract Path getSpillIndexFile(int spillNumber) throws IOException; /** * Create a local map spill index file name. - * + * * @param spillNumber the number * @param size the size of the file * @return path * @throws IOException */ - public Path getSpillIndexFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" - + spillNumber + ".out.index", size, conf); - } + public abstract Path getSpillIndexFileForWrite(int spillNumber, long size) + throws IOException; /** * Return a local reduce input file created earlier - * + * * @param mapId a map task id * @return path - * @throws IOException + * @throws IOException */ - public Path getInputFile(int mapId) - throws IOException { - return lDirAlloc.getLocalPathToRead(String.format( - REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer - .valueOf(mapId)), conf); - } + public abstract Path getInputFile(int mapId) throws IOException; /** * Create a local reduce input file name. - * + * * @param mapId a map task id * @param size the size of the file * @return path * @throws IOException */ - public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, - long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(String.format( - REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()), - size, conf); - } + public abstract Path getInputFileForWrite( + org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException; /** Removes all of the files related to a task. */ - public void removeAll() - throws IOException { - conf.deleteLocalFiles(TaskTracker.OUTPUT); + public abstract void removeAll() throws IOException; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; } - public void setConf(Configuration conf) { - if (conf instanceof JobConf) { - this.conf = (JobConf) conf; - } else { - this.conf = new JobConf(conf); - } + @Override + public Configuration getConf() { + return conf; } - + } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java b/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java index 951b45ae70f..d1bff52ef3f 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java @@ -1735,13 +1735,13 @@ private void mergeParts() throws IOException, InterruptedException, } if (numSpills == 1) { //the spill is the final output rfs.rename(filename[0], - new Path(filename[0].getParent(), "file.out")); + mapOutputFile.getOutputFileForWriteInVolume(filename[0])); if (indexCacheList.size() == 0) { rfs.rename(mapOutputFile.getSpillIndexFile(0), - new Path(filename[0].getParent(),"file.out.index")); + mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0])); } else { indexCacheList.get(0).writeToFile( - new Path(filename[0].getParent(),"file.out.index"), job); + mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job); } return; } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java b/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java index 6256c662730..5e6822a0869 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java @@ -362,7 +362,8 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, - taskStatus, copyPhase, sortPhase, this); + taskStatus, copyPhase, sortPhase, this, + mapOutputFile); rIter = shuffle.run(); } else { // local job runner doesn't have a copy phase diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/Task.java b/mapreduce/src/java/org/apache/hadoop/mapred/Task.java index 8ad56a7d051..e71209b83da 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/Task.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/Task.java @@ -146,7 +146,7 @@ static synchronized String getOutputName(int partition) { private long initCpuCumulativeTime = 0; protected JobConf conf; - protected MapOutputFile mapOutputFile = new MapOutputFile(); + protected MapOutputFile mapOutputFile; protected LocalDirAllocator lDirAlloc; private final static int MAX_RETRIES = 10; protected JobContext jobContext; @@ -1150,7 +1150,9 @@ public void setConf(Configuration conf) { } else { this.conf = new JobConf(conf); } - this.mapOutputFile.setConf(this.conf); + this.mapOutputFile = ReflectionUtils.newInstance( + conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, + MROutputFiles.class, MapOutputFile.class), conf); this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); // add the static resolutions (this is required for the junit to // work on testcases that simulate multiple nodes on a single physical diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java b/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java index bf78fe12dd3..caf6dcbce34 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java @@ -227,8 +227,12 @@ public void run() { continue; // processTree cannot be tracked } - if (taskTracker.runningTasks.get(tid).wasKilled()) { - continue; // this task has been killed already + TaskInProgress tip = taskTracker.getRunningTask(tid); + if (tip == null) { + continue; + } + if (tip.wasKilled()) { + continue; } LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = " @@ -514,6 +518,12 @@ public int compare(TaskAttemptID tid1, TaskAttemptID tid2) { * @param msg diagnostics message */ private void killTask(TaskAttemptID tid, String msg) { + TaskInProgress tip = taskTracker.getRunningTask(tid); + if (tip != null) { + //for the task identified to be killed update taskDiagnostic + TaskStatus taskStatus = tip.getStatus(); + taskStatus.setDiagnosticInfo(msg); + } // Kill the task and mark it as killed. taskTracker.cleanUpOverMemoryTask(tid, false, msg); // Now destroy the ProcessTree, remove it from monitoring map. @@ -530,7 +540,7 @@ private void killTask(TaskAttemptID tid, String msg) { * @return true if the task can be killed */ private boolean isKillable(TaskAttemptID tid) { - TaskInProgress tip = taskTracker.runningTasks.get(tid); + TaskInProgress tip = taskTracker.getRunningTask(tid); return tip != null && !tip.wasKilled() && (tip.getRunState() == TaskStatus.State.RUNNING || tip.getRunState() == TaskStatus.State.COMMIT_PENDING); diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java b/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java index 52ea9162b19..40f85682524 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java @@ -4218,4 +4218,8 @@ JobACLsManager getJobACLsManager() { ACLsManager getACLsManager() { return aclsManager; } + + synchronized TaskInProgress getRunningTask(TaskAttemptID tid) { + return runningTasks.get(tid); + } } diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java index 0be084077d4..96c2bc53a19 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java @@ -122,7 +122,7 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private JobStatus status; private long statustime; private Cluster cluster; - + @Deprecated public Job() throws IOException { this(new Configuration()); @@ -360,8 +360,11 @@ public boolean isRetired() throws IOException, InterruptedException { @Override public String toString() { ensureState(JobState.RUNNING); + String reasonforFailure = " "; try { updateStatus(); + if (status.getState().equals(JobStatus.State.FAILED)) + reasonforFailure = getTaskFailureEventString(); } catch (IOException e) { } catch (InterruptedException ie) { } @@ -378,10 +381,34 @@ public String toString() { sb.append(status.getState()).append("\n"); sb.append("history URL: "); sb.append(status.getHistoryFile()).append("\n"); - sb.append("retired: ").append(status.isRetired()); + sb.append("retired: ").append(status.isRetired()).append("\n"); + sb.append("reason for failure: ").append(reasonforFailure); return sb.toString(); } - + + /** + * @return taskid which caused job failure + * @throws IOException + * @throws InterruptedException + */ + String getTaskFailureEventString() throws IOException, + InterruptedException { + int failCount = 1; + TaskCompletionEvent lastEvent = null; + for (TaskCompletionEvent event : cluster.getClient().getTaskCompletionEvents( + status.getJobID(), 0, 10)) { + if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { + failCount++; + lastEvent = event; + } + } + String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); + String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); + return (" task " + taskID + " failed " + + failCount + " times " + "For details check tasktracker at: " + + lastEvent.getTaskTrackerHttp()); + } + /** * Get the information of the current state of the tasks of a job. * diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java index 9e88ea182ea..6f4a1628739 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java @@ -59,4 +59,6 @@ public interface MRConfig { 7*24*60*60*1000; // 7 days public static final String FRAMEWORK_NAME = "mapreduce.framework.name"; + public static final String TASK_LOCAL_OUTPUT_CLASS = + "mapreduce.task.local.output.class"; } diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index 130622488a1..6fa35035996 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -262,6 +262,8 @@ private void handleTaskFailedEvent(TaskFailedEvent event) { taskInfo.finishTime = event.getFinishTime(); taskInfo.error = event.getError(); taskInfo.failedDueToAttemptId = event.getFailedAttemptID(); + info.errorInfo = "Task " + taskInfo.taskId +" failed " + + taskInfo.attemptsMap.size() + " times "; } private void handleTaskStartedEvent(TaskStartedEvent event) { @@ -321,6 +323,7 @@ private void handleJobSubmittedEvent(JobSubmittedEvent event) { * The class where job information is aggregated into after parsing */ public static class JobInfo { + String errorInfo = "None"; long submitTime; long finishTime; JobID jobid; @@ -406,6 +409,7 @@ public void printAll() { public long getFinishedReduces() { return finishedReduces; } /** Get the job status */ public String getJobStatus() { return jobStatus; } + public String getErrorInfo() { return errorInfo; } /** Get the counters for the job */ public Counters getTotalCounters() { return totalCounters; } /** Get the map counters for the job */ diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index 2566677b6a5..d5e006b776a 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -67,7 +67,7 @@ public void setDatum(Object datum) { /** Get the task finish time */ public long getFinishTime() { return datum.finishTime; } /** Get task counters */ - Counters getCounters() { return EventReader.fromAvro(datum.counters); } + public Counters getCounters() { return EventReader.fromAvro(datum.counters); } /** Get task type */ public TaskType getTaskType() { return TaskType.valueOf(datum.taskType.toString()); diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java index b39d90be789..0a42f5179d1 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java @@ -34,10 +34,10 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.KerberosName; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -92,6 +92,13 @@ static void obtainTokensForNamenodesInternal(Credentials credentials, } } + static String getJTPrincipal(Configuration conf) throws IOException { + String jtHostname = JobTracker.getAddress(conf).getHostName(); + // get jobtracker principal for use as delegation token renewer + return SecurityUtil.getServerPrincipal(conf.get(JTConfig.JT_USER_NAME), + jtHostname); + } + /** * get delegation token for a specific FS * @param fs @@ -102,12 +109,11 @@ static void obtainTokensForNamenodesInternal(Credentials credentials, */ static void obtainTokensForNamenodesInternal(FileSystem fs, Credentials credentials, Configuration conf) throws IOException { - - // get jobtracker principal id (for the renewer) - KerberosName jtKrbName = - new KerberosName(conf.get(JTConfig.JT_USER_NAME,"")); - - String delegTokenRenewer = jtKrbName.getShortName(); + String delegTokenRenewer = getJTPrincipal(conf); + if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { + throw new IOException( + "Can't get JobTracker Kerberos principal for use as renewer"); + } boolean readFile = true; String fsName = fs.getCanonicalServiceName(); diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java index c0a8207356a..83b4d650987 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java @@ -133,7 +133,7 @@ public MergeManager(TaskAttemptID reduceId, JobConf jobConf, Counters.Counter reduceCombineInputCounter, Counters.Counter mergedMapOutputsCounter, ExceptionReporter exceptionReporter, - Progress mergePhase) { + Progress mergePhase, MapOutputFile mapOutputFile) { this.reduceId = reduceId; this.jobConf = jobConf; this.localDirAllocator = localDirAllocator; @@ -146,7 +146,7 @@ public MergeManager(TaskAttemptID reduceId, JobConf jobConf, this.reduceCombineInputCounter = reduceCombineInputCounter; this.spilledRecordsCounter = spilledRecordsCounter; this.mergedMapOutputsCounter = mergedMapOutputsCounter; - this.mapOutputFile = new MapOutputFile(); + this.mapOutputFile = mapOutputFile; this.mapOutputFile.setConf(jobConf); this.localFS = localFS; diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index 177a24841b4..4b8b854952c 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; @@ -75,7 +76,8 @@ public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, TaskStatus status, Progress copyPhase, Progress mergePhase, - Task reduceTask) { + Task reduceTask, + MapOutputFile mapOutputFile) { this.reduceId = reduceId; this.jobConf = jobConf; this.umbilical = umbilical; @@ -95,7 +97,7 @@ public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, spilledRecordsCounter, reduceCombineInputCounter, mergedMapOutputsCounter, - this, mergePhase); + this, mergePhase, mapOutputFile); } @SuppressWarnings("unchecked") diff --git a/mapreduce/src/test/mapred-site.xml b/mapreduce/src/test/mapred-site.xml index 6773c40dc57..4874e61be54 100644 --- a/mapreduce/src/test/mapred-site.xml +++ b/mapreduce/src/test/mapred-site.xml @@ -48,4 +48,8 @@ mapreduce.jobtracker.persist.jobstatus.active false + + mapreduce.task.local.output.class + org.apache.hadoop.mapred.MROutputFiles + diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java index b2b1af98523..afc0cdc0fa9 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java @@ -293,7 +293,7 @@ public void reduce(WritableComparable key, Iterator values, ) throws IOException { if (first) { first = false; - MapOutputFile mapOutputFile = new MapOutputFile(); + MapOutputFile mapOutputFile = new MROutputFiles(); mapOutputFile.setConf(conf); Path input = mapOutputFile.getInputFile(0); FileSystem fs = FileSystem.get(conf); diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java index 9678480f74c..fc53324d7c3 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java @@ -178,7 +178,7 @@ public void testBinaryTokenFile() throws IOException { jConf = mrCluster.createJobConf(); // provide namenodes names for the job to get the delegation tokens for - String nnUri = dfsCluster.getURI().toString(); + String nnUri = dfsCluster.getURI(0).toString(); jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); // job tracker principla id.. jConf.set(JTConfig.JT_USER_NAME, "jt_id"); diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java index a59dc53a1a7..8e9cb2cb591 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java @@ -150,6 +150,7 @@ private void populateTokens(Job job) { @BeforeClass public static void setUp() throws Exception { Configuration conf = new Configuration(); + conf.set("hadoop.security.auth_to_local", "RULE:[2:$1]"); dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); jConf = new JobConf(conf); mrCluster = new MiniMRCluster(0, 0, numSlaves, @@ -224,10 +225,10 @@ public void testTokenCache() throws IOException { jConf = mrCluster.createJobConf(); // provide namenodes names for the job to get the delegation tokens for - String nnUri = dfsCluster.getURI().toString(); + String nnUri = dfsCluster.getURI(0).toString(); jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); // job tracker principla id.. - jConf.set(JTConfig.JT_USER_NAME, "jt_id"); + jConf.set(JTConfig.JT_USER_NAME, "jt_id/foo@BAR"); // using argument to pass the file name String[] args = { @@ -361,4 +362,20 @@ public String answer(InvocationOnMock invocation) } } + /** + * verify _HOST substitution + * @throws IOException + */ + @Test + public void testGetJTPrincipal() throws IOException { + String serviceName = "jt/"; + String hostName = "foo"; + String domainName = "@BAR"; + Configuration conf = new Configuration(); + conf.set(JTConfig.JT_IPC_ADDRESS, hostName + ":8888"); + conf.set(JTConfig.JT_USER_NAME, serviceName + SecurityUtil.HOSTNAME_PATTERN + + domainName); + assertEquals("Failed to substitute HOSTNAME_PATTERN with hostName", + serviceName + hostName + domainName, TokenCache.getJTPrincipal(conf)); + } } diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java b/mapreduce/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java index b27bb487045..8b45220332b 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java @@ -108,7 +108,7 @@ public void setUp() throws Exception { cluster = new MiniDFSCluster(0, config, 1, true, true, true, null, null, null, null); cluster.waitActive(); - URI uri = cluster.getURI(); + URI uri = cluster.getURI(0); MiniMRCluster miniMRCluster = new MiniMRCluster(0, uri.toString() , 3, null, null, config); diff --git a/mapreduce/src/webapps/job/jobdetailshistory.jsp b/mapreduce/src/webapps/job/jobdetailshistory.jsp index 68efc294969..2b0a014b3f5 100644 --- a/mapreduce/src/webapps/job/jobdetailshistory.jsp +++ b/mapreduce/src/webapps/job/jobdetailshistory.jsp @@ -45,6 +45,7 @@ <%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %> <% String logFile = request.getParameter("logFile"); + String reasonforFailure = " "; final Path jobFile = new Path(logFile); String jobid = JobHistory.getJobIDFromHistoryFilePath(jobFile).toString(); @@ -55,6 +56,8 @@ if (job == null) { return; } + if (job.getJobStatus().equals("FAILED")) + reasonforFailure = job.getErrorInfo(); %> @@ -78,6 +81,7 @@ Launched At: <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLaunchTime(), job.getSubmitTime()) %>
Finished At: <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getFinishTime(), job.getLaunchTime()) %>
Status: <%= ((job.getJobStatus()) == null ? "Incomplete" :job.getJobStatus()) %>
+ReasonForFailure: <%=reasonforFailure %>
<% HistoryViewer.SummarizedJob sj = new HistoryViewer.SummarizedJob(job); %>