diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5f504121f44..b878f0be35b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -282,6 +282,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3542. Support "FileSystemCounter" legacy counter group name for compatibility. (tomwhite) + MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs + in the correct directory to work properly in secure mode. (Hitesh Shah via + vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index d2400f053fe..cb3e80b8b21 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -22,20 +22,19 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.PrintStream; -import java.net.URI; import java.util.HashSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSError; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TypeConverter; @@ -47,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.app.job.Task; -import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.service.AbstractService; /** @@ -80,7 +78,10 @@ public class LocalContainerLauncher extends AbstractService implements super(LocalContainerLauncher.class.getName()); this.context = context; this.umbilical = umbilical; - // umbilical: MRAppMaster creates (taskAttemptListener), passes to us (TODO/FIXME: pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar: implement umbilical protocol but skip RPC stuff) + // umbilical: MRAppMaster creates (taskAttemptListener), passes to us + // (TODO/FIXME: pointless to use RPC to talk to self; should create + // LocalTaskAttemptListener or similar: implement umbilical protocol + // but skip RPC stuff) try { curFC = FileContext.getFileContext(curDir.toURI()); @@ -152,7 +153,6 @@ public class LocalContainerLauncher extends AbstractService implements * ]] * - runs Task (runSubMap() or runSubReduce()) * - TA can safely send TA_UPDATE since in RUNNING state - * [modulo possible TA-state-machine race noted below: CHECK (TODO)] */ private class SubtaskRunner implements Runnable { @@ -162,6 +162,7 @@ public class LocalContainerLauncher extends AbstractService implements SubtaskRunner() { } + @SuppressWarnings("unchecked") @Override public void run() { ContainerLauncherEvent event = null; @@ -183,7 +184,7 @@ public class LocalContainerLauncher extends AbstractService implements ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent)event; - TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME: can attemptID ever be null? (only if retrieved over umbilical?) + TaskAttemptId attemptID = launchEv.getTaskAttemptID(); Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); int numMapTasks = job.getTotalMaps(); @@ -204,7 +205,6 @@ public class LocalContainerLauncher extends AbstractService implements // port number is set to -1 in this case. context.getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(attemptID, -1)); - //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter) if (numMapTasks == 0) { doneWithMaps = true; @@ -259,6 +259,7 @@ public class LocalContainerLauncher extends AbstractService implements } } + @SuppressWarnings("deprecation") private void runSubtask(org.apache.hadoop.mapred.Task task, final TaskType taskType, TaskAttemptId attemptID, @@ -270,6 +271,19 @@ public class LocalContainerLauncher extends AbstractService implements try { JobConf conf = new JobConf(getConfig()); + conf.set(JobContext.TASK_ID, task.getTaskID().toString()); + conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString()); + conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP)); + conf.setInt(JobContext.TASK_PARTITION, task.getPartition()); + conf.set(JobContext.ID, task.getJobID().toString()); + + // Use the AM's local dir env to generate the intermediate step + // output files + String[] localSysDirs = StringUtils.getTrimmedStrings( + System.getenv(ApplicationConstants.LOCAL_DIR_ENV)); + conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs); + LOG.info(MRConfig.LOCAL_DIR + " for uber task: " + + conf.get(MRConfig.LOCAL_DIR)); // mark this as an uberized subtask so it can set task counter // (longer-term/FIXME: could redefine as job counter and send @@ -285,12 +299,12 @@ public class LocalContainerLauncher extends AbstractService implements if (doneWithMaps) { LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task (" + attemptID + "), but should be finished with maps"); - // throw new RuntimeException() (FIXME: what's appropriate here?) + throw new RuntimeException(); } MapTask map = (MapTask)task; + map.setConf(conf); - //CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first? map.run(conf, umbilical); if (renameOutputs) { @@ -305,19 +319,23 @@ public class LocalContainerLauncher extends AbstractService implements } else /* TaskType.REDUCE */ { if (!doneWithMaps) { - //check if event-queue empty? whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): doesn't send reduce event until maps all done] + // check if event-queue empty? whole idea of counting maps vs. + // checking event queue is a tad wacky...but could enforce ordering + // (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): + // doesn't send reduce event until maps all done] LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task (" + attemptID + "), but not yet finished with maps"); - // throw new RuntimeException() (FIXME) // or push reduce event back onto end of queue? (probably former) + throw new RuntimeException(); } - ReduceTask reduce = (ReduceTask)task; - // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner: // set framework name to local to make task local conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle + ReduceTask reduce = (ReduceTask)task; + reduce.setConf(conf); + reduce.run(conf, umbilical); //relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?) } @@ -334,18 +352,7 @@ public class LocalContainerLauncher extends AbstractService implements try { if (task != null) { // do cleanup for the task -// if (childUGI == null) { // no need to job into doAs block - task.taskCleanup(umbilical); -// } else { -// final Task taskFinal = task; -// childUGI.doAs(new PrivilegedExceptionAction() { -// @Override -// public Object run() throws Exception { -// taskFinal.taskCleanup(umbilical); -// return null; -// } -// }); -// } + task.taskCleanup(umbilical); } } catch (Exception e) { LOG.info("Exception cleaning up: " @@ -354,51 +361,21 @@ public class LocalContainerLauncher extends AbstractService implements // Report back any failures, for diagnostic purposes ByteArrayOutputStream baos = new ByteArrayOutputStream(); exception.printStackTrace(new PrintStream(baos)); -// if (classicAttemptID != null) { - umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString()); -// } + umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString()); throw new RuntimeException(); } catch (Throwable throwable) { LOG.fatal("Error running local (uberized) 'child' : " + StringUtils.stringifyException(throwable)); -// if (classicAttemptID != null) { - Throwable tCause = throwable.getCause(); - String cause = (tCause == null) - ? throwable.getMessage() - : StringUtils.stringifyException(tCause); - umbilical.fatalError(classicAttemptID, cause); -// } + Throwable tCause = throwable.getCause(); + String cause = (tCause == null) + ? throwable.getMessage() + : StringUtils.stringifyException(tCause); + umbilical.fatalError(classicAttemptID, cause); throw new RuntimeException(); - - } finally { -/* -FIXME: do we need to do any of this stuff? (guessing not since not in own JVM) - RPC.stopProxy(umbilical); - DefaultMetricsSystem.shutdown(); - // Shutting down log4j of the child-vm... - // This assumes that on return from Task.run() - // there is no more logging done. - LogManager.shutdown(); - */ } } - -/* FIXME: may not need renameMapOutputForReduce() anymore? TEST! - -${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for containers; -contains launch_container.sh script, which, when executed, creates symlinks and -sets up env - "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out - "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?) - "$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done - - OHO! no further need for this at all? $taskId is unique per subtask - now => should work fine to leave alone. TODO: test with teragen or - similar - */ - /** * Within the _local_ filesystem (not HDFS), all activity takes place within * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/), @@ -409,14 +386,21 @@ sets up env * filenames instead of "file.out". (All of this is entirely internal, * so there are no particular compatibility issues.) */ + @SuppressWarnings("deprecation") private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { FileSystem localFs = FileSystem.getLocal(conf); // move map output to reduce input Path mapOut = subMapOutputFile.getOutputFile(); + FileStatus mStatus = localFs.getFileStatus(mapOut); Path reduceIn = subMapOutputFile.getInputFileForWrite( - TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut)); + TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming map output file for task attempt " + + mapId.toString() + " from original location " + mapOut.toString() + + " to destination " + reduceIn.toString()); + } if (!localFs.mkdirs(reduceIn.getParent())) { throw new IOException("Mkdirs failed to create " + reduceIn.getParent().toString()); @@ -429,8 +413,7 @@ sets up env * Also within the local filesystem, we need to restore the initial state * of the directory as much as possible. Compare current contents against * the saved original state and nuke everything that doesn't belong, with - * the exception of the renamed map outputs (see above). -FIXME: do we really need to worry about renamed map outputs, or already moved to output dir on commit? if latter, fix comment + * the exception of the renamed map outputs. * * Any jobs that go out of their way to rename or delete things from the * local directory are considered broken and deserve what they get... diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java index 689671a6fe0..de777119565 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java @@ -236,6 +236,13 @@ public class MapReduceChildJVM { getTaskLogFile(TaskLog.LogName.PROFILE) ) ); + if (task.isMapTask()) { + vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, "")); + } + else { + vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "")); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 800dfa9d365..f9de4bc5035 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -156,6 +156,7 @@ public class MRAppMaster extends CompositeService { private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private boolean inRecovery = false; + private SpeculatorEventDispatcher speculatorEventDispatcher; private Job job; private Credentials fsTokens = new Credentials(); // Filled during init @@ -265,8 +266,9 @@ public class MRAppMaster extends CompositeService { addIfService(speculator); } + speculatorEventDispatcher = new SpeculatorEventDispatcher(conf); dispatcher.register(Speculator.EventType.class, - new SpeculatorEventDispatcher(conf)); + speculatorEventDispatcher); // service to allocate containers from RM (if non-uber) or to fake it (uber) containerAllocator = createContainerAllocator(clientService, context); @@ -386,7 +388,7 @@ public class MRAppMaster extends CompositeService { // This will also send the final report to the ResourceManager LOG.info("Calling stop for all the services"); stop(); - + // Send job-end notification try { LOG.info("Job end notification started for jobID : " @@ -401,14 +403,14 @@ public class MRAppMaster extends CompositeService { } catch (Throwable t) { LOG.warn("Graceful stop failed ", t); } - + // Cleanup staging directory try { cleanupStagingDir(); } catch(IOException io) { LOG.warn("Failed to delete staging dir"); } - + //Bring the process down by force. //Not needed after HADOOP-7140 LOG.info("Exiting MR AppMaster..GoodBye!"); @@ -790,10 +792,6 @@ public class MRAppMaster extends CompositeService { // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); - // send init to speculator. This won't yest start as dispatcher isn't - // started yet. - dispatcher.getEventHandler().handle( - new SpeculatorEvent(job.getID(), clock.getTime())); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to @@ -801,9 +799,15 @@ public class MRAppMaster extends CompositeService { // and container-launcher services/event-handlers). if (job.isUber()) { + speculatorEventDispatcher.disableSpeculation(); LOG.info("MRAppMaster uberizing job " + job.getID() - + " in local container (\"uber-AM\")."); + + " in local container (\"uber-AM\") on node " + + nmHost + ":" + nmPort + "."); } else { + // send init to speculator only for non-uber jobs. + // This won't yet start as dispatcher isn't started yet. + dispatcher.getEventHandler().handle( + new SpeculatorEvent(job.getID(), clock.getTime())); LOG.info("MRAppMaster launching normal, non-uberized, multi-container " + "job " + job.getID() + "."); } @@ -865,17 +869,24 @@ public class MRAppMaster extends CompositeService { private class SpeculatorEventDispatcher implements EventHandler { private final Configuration conf; + private volatile boolean disabled; public SpeculatorEventDispatcher(Configuration config) { this.conf = config; } @Override public void handle(SpeculatorEvent event) { - if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) - || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { + if (!disabled && + (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) + || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) { // Speculator IS enabled, direct the event to there. speculator.handle(event); } } + + public void disableSpeculation() { + disabled = true; + } + } private static void validateInputParam(String value, String param) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index dd19ed07b9a..9291075e6d2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; @@ -583,13 +584,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, if (getState() == JobState.NEW) { return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, - cleanupProgress, remoteJobConfFile.toString(), amInfos); + cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); } return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, computeProgress(mapTasks), computeProgress(reduceTasks), - cleanupProgress, remoteJobConfFile.toString(), amInfos); + cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); } finally { readLock.unlock(); } @@ -812,6 +813,129 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, return amInfos; } + /** + * Decide whether job can be run in uber mode based on various criteria. + * @param dataInputLength Total length for all splits + */ + private void makeUberDecision(long dataInputLength) { + //FIXME: need new memory criterion for uber-decision (oops, too late here; + // until AM-resizing supported, + // must depend on job client to pass fat-slot needs) + // these are no longer "system" settings, necessarily; user may override + int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9); + + //FIXME: handling multiple reduces within a single AM does not seem to + //work. + // int sysMaxReduces = + // job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); + int sysMaxReduces = 1; + + long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, + conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is + // wrong; get FS from [File?]InputFormat and default block size from that + + long sysMemSizeForUberSlot = + conf.getInt(MRJobConfig.MR_AM_VMEM_MB, + MRJobConfig.DEFAULT_MR_AM_VMEM_MB); + + boolean uberEnabled = + conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps); + boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces); + boolean smallInput = (dataInputLength <= sysMaxBytes); + // ignoring overhead due to UberAM and statics as negligible here: + boolean smallMemory = + ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), + conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0)) + <= sysMemSizeForUberSlot) + || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)); + boolean notChainJob = !isChainJob(conf); + + // User has overall veto power over uberization, or user can modify + // limits (overriding system settings and potentially shooting + // themselves in the head). Note that ChainMapper/Reducer are + // fundamentally incompatible with MR-1220; they employ a blocking + // queue between the maps/reduces and thus require parallel execution, + // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks + // and thus requires sequential execution. + isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks + && smallInput && smallMemory && notChainJob; + + if (isUber) { + LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" + + numReduceTasks + "r tasks (" + dataInputLength + + " input bytes) will run sequentially on single node."); + + // make sure reduces are scheduled only after all map are completed + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, + 1.0f); + // uber-subtask attempts all get launched on same node; if one fails, + // probably should retry elsewhere, i.e., move entire uber-AM: ergo, + // limit attempts to 1 (or at most 2? probably not...) + conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1); + conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1); + + // disable speculation + conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); + conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + } else { + StringBuilder msg = new StringBuilder(); + msg.append("Not uberizing ").append(jobId).append(" because:"); + if (!uberEnabled) + msg.append(" not enabled;"); + if (!smallNumMapTasks) + msg.append(" too many maps;"); + if (!smallNumReduceTasks) + msg.append(" too many reduces;"); + if (!smallInput) + msg.append(" too much input;"); + if (!smallMemory) + msg.append(" too much RAM;"); + if (!notChainJob) + msg.append(" chainjob"); + LOG.info(msg.toString()); + } + } + + /** + * ChainMapper and ChainReducer must execute in parallel, so they're not + * compatible with uberization/LocalContainerLauncher (100% sequential). + */ + private boolean isChainJob(Configuration conf) { + boolean isChainJob = false; + try { + String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR); + if (mapClassName != null) { + Class mapClass = Class.forName(mapClassName); + if (ChainMapper.class.isAssignableFrom(mapClass)) + isChainJob = true; + } + } catch (ClassNotFoundException cnfe) { + // don't care; assume it's not derived from ChainMapper + } + try { + String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR); + if (reduceClassName != null) { + Class reduceClass = Class.forName(reduceClassName); + if (ChainReducer.class.isAssignableFrom(reduceClass)) + isChainJob = true; + } + } catch (ClassNotFoundException cnfe) { + // don't care; assume it's not derived from ChainReducer + } + return isChainJob; + } + + /* + private int getBlockSize() { + String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR); + if (inputClassName != null) { + Class inputClass - Class.forName(inputClassName); + if (FileInputFormat) + } + } + */ + public static class InitTransition implements MultipleArcTransition { @@ -863,81 +987,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, inputLength += taskSplitMetaInfo[i].getInputDataLength(); } - //FIXME: need new memory criterion for uber-decision (oops, too late here; - // until AM-resizing supported, must depend on job client to pass fat-slot needs) - // these are no longer "system" settings, necessarily; user may override - int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9); - int sysMaxReduces = - job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); - long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, - job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is - // wrong; get FS from [File?]InputFormat and default block size from that - //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); - // FIXME [could use default AM-container memory size...] - - boolean uberEnabled = - job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); - boolean smallNumMapTasks = (job.numMapTasks <= sysMaxMaps); - boolean smallNumReduceTasks = (job.numReduceTasks <= sysMaxReduces); - boolean smallInput = (inputLength <= sysMaxBytes); - boolean smallMemory = true; //FIXME (see above) - // ignoring overhead due to UberTask and statics as negligible here: - // FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot - // || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT) - boolean notChainJob = !isChainJob(job.conf); - - // User has overall veto power over uberization, or user can modify - // limits (overriding system settings and potentially shooting - // themselves in the head). Note that ChainMapper/Reducer are - // fundamentally incompatible with MR-1220; they employ a blocking - - // User has overall veto power over uberization, or user can modify - // limits (overriding system settings and potentially shooting - // themselves in the head). Note that ChainMapper/Reducer are - // fundamentally incompatible with MR-1220; they employ a blocking - // queue between the maps/reduces and thus require parallel execution, - // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks - // and thus requires sequential execution. - job.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks - && smallInput && smallMemory && notChainJob; - - if (job.isUber) { - LOG.info("Uberizing job " + job.jobId + ": " + job.numMapTasks + "m+" - + job.numReduceTasks + "r tasks (" + inputLength - + " input bytes) will run sequentially on single node."); - //TODO: also note which node? - - // make sure reduces are scheduled only after all map are completed - job.conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, - 1.0f); - // uber-subtask attempts all get launched on same node; if one fails, - // probably should retry elsewhere, i.e., move entire uber-AM: ergo, - // limit attempts to 1 (or at most 2? probably not...) - job.conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1); - job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1); - - // disable speculation: makes no sense to speculate an entire job - //canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old - //version, ultimately was from conf.getMapSpeculativeExecution(), - //conf.getReduceSpeculativeExecution()] - } else { - StringBuilder msg = new StringBuilder(); - msg.append("Not uberizing ").append(job.jobId).append(" because:"); - if (!uberEnabled) - msg.append(" not enabled;"); - if (!smallNumMapTasks) - msg.append(" too many maps;"); - if (!smallNumReduceTasks) - msg.append(" too many reduces;"); - if (!smallInput) - msg.append(" too much input;"); - if (!smallMemory) - msg.append(" too much RAM;"); - if (!notChainJob) - msg.append(" chainjob"); - LOG.info(msg.toString()); - } - + job.makeUberDecision(inputLength); + job.taskAttemptCompletionEvents = new ArrayList( job.numMapTasks + job.numReduceTasks + 10); @@ -1008,35 +1059,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } - /** - * ChainMapper and ChainReducer must execute in parallel, so they're not - * compatible with uberization/LocalContainerLauncher (100% sequential). - */ - boolean isChainJob(Configuration conf) { - boolean isChainJob = false; - try { - String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR); - if (mapClassName != null) { - Class mapClass = Class.forName(mapClassName); - if (ChainMapper.class.isAssignableFrom(mapClass)) - isChainJob = true; - } - } catch (ClassNotFoundException cnfe) { - // don't care; assume it's not derived from ChainMapper - } - try { - String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR); - if (reduceClassName != null) { - Class reduceClass = Class.forName(reduceClassName); - if (ChainReducer.class.isAssignableFrom(reduceClass)) - isChainJob = true; - } - } catch (ClassNotFoundException cnfe) { - // don't care; assume it's not derived from ChainReducer - } - return isChainJob; - } - private void createMapTasks(JobImpl job, long inputLength, TaskSplitMetaInfo[] splits) { for (int i=0; i < job.numMapTasks; ++i) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index f0ce272bb8f..f9e58b7ecc4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -60,8 +60,8 @@ public class LocalContainerAllocator extends RMCommunicator private static final Log LOG = LogFactory.getLog(LocalContainerAllocator.class); + @SuppressWarnings("rawtypes") private final EventHandler eventHandler; -// private final ApplicationId appID; private AtomicInteger containerCount = new AtomicInteger(); private long retryInterval; private long retrystartTime; @@ -73,8 +73,6 @@ public class LocalContainerAllocator extends RMCommunicator AppContext context) { super(clientService, context); this.eventHandler = context.getEventHandler(); -// this.appID = context.getApplicationID(); - } @Override @@ -88,6 +86,7 @@ public class LocalContainerAllocator extends RMCommunicator retrystartTime = System.currentTimeMillis(); } + @SuppressWarnings("unchecked") @Override protected synchronized void heartbeat() throws Exception { AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( @@ -124,6 +123,7 @@ public class LocalContainerAllocator extends RMCommunicator } } + @SuppressWarnings("unchecked") @Override public void handle(ContainerAllocatorEvent event) { if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 68d9c2462b4..5028355acf3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.io.IOException; import java.security.PrivilegedAction; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,7 +66,7 @@ public abstract class RMCommunicator extends AbstractService { private int rmPollInterval;//millis protected ApplicationId applicationId; protected ApplicationAttemptId applicationAttemptId; - private volatile boolean stopped; + private AtomicBoolean stopped; protected Thread allocatorThread; protected EventHandler eventHandler; protected AMRMProtocol scheduler; @@ -88,6 +89,7 @@ public abstract class RMCommunicator extends AbstractService { this.eventHandler = context.getEventHandler(); this.applicationId = context.getApplicationID(); this.applicationAttemptId = context.getApplicationAttemptId(); + this.stopped = new AtomicBoolean(false); } @Override @@ -213,7 +215,10 @@ public abstract class RMCommunicator extends AbstractService { @Override public void stop() { - stopped = true; + if (stopped.getAndSet(true)) { + // return if already stopped + return; + } allocatorThread.interrupt(); try { allocatorThread.join(); @@ -228,7 +233,7 @@ public abstract class RMCommunicator extends AbstractService { allocatorThread = new Thread(new Runnable() { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { Thread.sleep(rmPollInterval); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java index 9c59269ec6f..27fcec2c6a2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java @@ -183,6 +183,7 @@ public class TestMRClientService { Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId() .getAttemptId()); Assert.assertTrue(amInfo.getStartTime() > 0); + Assert.assertEquals(false, jr.isUber()); } private void verifyTaskAttemptReport(TaskAttemptReport tar) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 812393c1b58..c9436e5645a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -118,7 +118,7 @@ public class TestRMContainerAllocator { Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, - 0, 0, 0, 0, 0, 0, "jobfile", null)); + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -195,7 +195,7 @@ public class TestRMContainerAllocator { Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, - 0, 0, 0, 0, 0, 0, "jobfile", null)); + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -261,7 +261,7 @@ public class TestRMContainerAllocator { Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, - 0, 0, 0, 0, 0, 0, "jobfile", null)); + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -375,7 +375,7 @@ public class TestRMContainerAllocator { public JobReport getReport() { return MRBuilderUtils.newJobReport(this.jobId, "job", "user", JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress, - this.reduceProgress, this.cleanupProgress, "jobfile", null); + this.reduceProgress, this.cleanupProgress, "jobfile", null, false); } } @@ -511,7 +511,7 @@ public class TestRMContainerAllocator { Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, - 0, 0, 0, 0, 0, 0, "jobfile", null)); + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -610,7 +610,7 @@ public class TestRMContainerAllocator { Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, - 0, 0, 0, 0, 0, 0, "jobfile", null)); + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index f20fbf934af..d3ebee62b2a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -288,7 +288,7 @@ public class TypeConverter { .getMapProgress(), jobreport.getReduceProgress(), jobreport .getCleanupProgress(), fromYarn(jobreport.getJobState()), jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport - .getJobFile(), trackingUrl); + .getJobFile(), trackingUrl, jobreport.isUber()); jobStatus.setFailureInfo(jobreport.getDiagnostics()); return jobStatus; } @@ -421,7 +421,7 @@ public class TypeConverter { TypeConverter.fromYarn(application.getYarnApplicationState()), org.apache.hadoop.mapreduce.JobPriority.NORMAL, application.getUser(), application.getName(), - application.getQueue(), jobFile, trackingUrl + application.getQueue(), jobFile, trackingUrl, false ); jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url jobStatus.setStartTime(application.getStartTime()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java index 469c425febf..b2f2cc1fc80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java @@ -36,6 +36,7 @@ public interface JobReport { public abstract String getDiagnostics(); public abstract String getJobFile(); public abstract List getAMInfos(); + public abstract boolean isUber(); public abstract void setJobId(JobId jobId); public abstract void setJobState(JobState jobState); @@ -52,4 +53,5 @@ public interface JobReport { public abstract void setDiagnostics(String diagnostics); public abstract void setJobFile(String jobFile); public abstract void setAMInfos(List amInfos); + public abstract void setIsUber(boolean isUber); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java index 41e46c33915..1b16c864f80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java @@ -332,4 +332,16 @@ public class JobReportPBImpl extends ProtoBase implements private JobState convertFromProtoFormat(JobStateProto e) { return MRProtoUtils.convertFromProtoFormat(e); } + + @Override + public synchronized boolean isUber() { + JobReportProtoOrBuilder p = viaProto ? proto : builder; + return p.getIsUber(); + } + + @Override + public synchronized void setIsUber(boolean isUber) { + maybeInitBuilder(); + builder.setIsUber(isUber); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java index 109028205da..2b5b21c8679 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java @@ -60,7 +60,8 @@ public class MRBuilderUtils { public static JobReport newJobReport(JobId jobId, String jobName, String userName, JobState state, long submitTime, long startTime, long finishTime, float setupProgress, float mapProgress, float reduceProgress, - float cleanupProgress, String jobFile, List amInfos) { + float cleanupProgress, String jobFile, List amInfos, + boolean isUber) { JobReport report = Records.newRecord(JobReport.class); report.setJobId(jobId); report.setJobName(jobName); @@ -75,6 +76,7 @@ public class MRBuilderUtils { report.setReduceProgress(reduceProgress); report.setJobFile(jobFile); report.setAMInfos(amInfos); + report.setIsUber(isUber); return report; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto index 3390b7ad845..95345ac8162 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto @@ -152,6 +152,7 @@ message JobReportProto { optional string jobFile = 13; repeated AMInfoProto am_infos = 14; optional int64 submit_time = 15; + optional bool is_uber = 16 [default = false]; } message AMInfoProto { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java index e5add2139f5..c10a4c0a640 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java @@ -97,7 +97,7 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus { String user, String jobName, String jobFile, String trackingUrl) { this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, - JobPriority.NORMAL, user, jobName, jobFile, trackingUrl); + JobPriority.NORMAL, user, jobName, jobFile, trackingUrl); } /** @@ -135,7 +135,8 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus { String user, String jobName, String jobFile, String trackingUrl) { this(jobid, 0.0f, mapProgress, reduceProgress, - cleanupProgress, runState, jp, user, jobName, jobFile, trackingUrl); + cleanupProgress, runState, jp, user, jobName, jobFile, + trackingUrl); } /** @@ -157,10 +158,57 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus { int runState, JobPriority jp, String user, String jobName, String jobFile, String trackingUrl) { this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, - runState, jp, - user, jobName, "default", jobFile, trackingUrl); + runState, jp, user, jobName, "default", jobFile, trackingUrl); } + + /** + * Create a job status object for a given jobid. + * @param jobid The jobid of the job + * @param setupProgress The progress made on the setup + * @param mapProgress The progress made on the maps + * @param reduceProgress The progress made on the reduces + * @param cleanupProgress The progress made on the cleanup + * @param runState The current state of the job + * @param jp Priority of the job. + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. + * @param isUber Whether job running in uber mode + */ + public JobStatus(JobID jobid, float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress, + int runState, JobPriority jp, String user, String jobName, + String jobFile, String trackingUrl, boolean isUber) { + this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, + runState, jp, user, jobName, "default", jobFile, trackingUrl, isUber); + } + /** + * Create a job status object for a given jobid. + * @param jobid The jobid of the job + * @param setupProgress The progress made on the setup + * @param mapProgress The progress made on the maps + * @param reduceProgress The progress made on the reduces + * @param cleanupProgress The progress made on the cleanup + * @param runState The current state of the job + * @param jp Priority of the job. + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param queue job queue name. + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. + */ + public JobStatus(JobID jobid, float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress, + int runState, JobPriority jp, + String user, String jobName, String queue, + String jobFile, String trackingUrl) { + this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, + runState, jp, + user, jobName, queue, jobFile, trackingUrl, false); + } + /** * Create a job status object for a given jobid. * @param jobid The jobid of the job @@ -175,25 +223,25 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus { * @param queue job queue name. * @param jobFile job configuration file. * @param trackingUrl link to the web-ui for details of the job. + * @param isUber Whether job running in uber mode */ public JobStatus(JobID jobid, float setupProgress, float mapProgress, float reduceProgress, float cleanupProgress, int runState, JobPriority jp, String user, String jobName, String queue, - String jobFile, String trackingUrl) { + String jobFile, String trackingUrl, boolean isUber) { super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()), - user, jobName, queue, jobFile, trackingUrl); + user, jobName, queue, jobFile, trackingUrl, isUber); } - public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){ JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()), stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(), stat.getCleanupProgress(), stat.getState().getValue(), JobPriority.valueOf(stat.getPriority().name()), stat.getUsername(), stat.getJobName(), stat.getJobFile(), - stat.getTrackingUrl()); + stat.getTrackingUrl(), stat.isUber()); old.setStartTime(stat.getStartTime()); old.setFinishTime(stat.getFinishTime()); old.setSchedulingInfo(stat.getSchedulingInfo()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 5e92baa8b64..e4351536c80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -467,6 +467,7 @@ public class Job extends JobContextImpl implements JobContext { sb.append("Job File: ").append(status.getJobFile()).append("\n"); sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); sb.append("\n"); + sb.append("Uber job : ").append(status.isUber()).append("\n"); sb.append("map() completion: "); sb.append(status.getMapProgress()).append("\n"); sb.append("reduce() completion: "); @@ -1268,12 +1269,20 @@ public class Job extends JobContextImpl implements JobContext { Job.getProgressPollInterval(clientConf); /* make sure to report full progress after the job is done */ boolean reportedAfterCompletion = false; + boolean reportedUberMode = false; while (!isComplete() || !reportedAfterCompletion) { if (isComplete()) { reportedAfterCompletion = true; } else { Thread.sleep(progMonitorPollIntervalMillis); } + if (status.getState() == JobStatus.State.PREP) { + continue; + } + if (!reportedUberMode) { + reportedUberMode = true; + LOG.info("Job " + jobId + " running in uber mode : " + isUber()); + } String report = (" map " + StringUtils.formatPercent(mapProgress(), 0)+ " reduce " + @@ -1497,4 +1506,10 @@ public class Job extends JobContextImpl implements JobContext { conf.set(Job.OUTPUT_FILTER, newValue.toString()); } + public boolean isUber() throws IOException, InterruptedException { + ensureState(JobState.RUNNING); + updateStatus(); + return status.isUber(); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java index 6edda66ca15..bdd5a299eeb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java @@ -97,7 +97,7 @@ public class JobStatus implements Writable, Cloneable { private int usedMem; private int reservedMem; private int neededMem; - + private boolean isUber; /** */ @@ -115,17 +115,17 @@ public class JobStatus implements Writable, Cloneable { * @param jp Priority of the job. * @param user userid of the person who submitted the job. * @param jobName user-specified job name. - * @param jobFile job configuration file. + * @param jobFile job configuration file. * @param trackingUrl link to the web-ui for details of the job. */ public JobStatus(JobID jobid, float setupProgress, float mapProgress, - float reduceProgress, float cleanupProgress, + float reduceProgress, float cleanupProgress, State runState, JobPriority jp, String user, String jobName, String jobFile, String trackingUrl) { this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, - runState, jp, user, jobName, "default", jobFile, trackingUrl); + runState, jp, user, jobName, "default", jobFile, trackingUrl, false); } - + /** * Create a job status object for a given jobid. * @param jobid The jobid of the job @@ -138,14 +138,39 @@ public class JobStatus implements Writable, Cloneable { * @param user userid of the person who submitted the job. * @param jobName user-specified job name. * @param queue queue name - * @param jobFile job configuration file. + * @param jobFile job configuration file. * @param trackingUrl link to the web-ui for details of the job. */ public JobStatus(JobID jobid, float setupProgress, float mapProgress, - float reduceProgress, float cleanupProgress, - State runState, JobPriority jp, - String user, String jobName, String queue, + float reduceProgress, float cleanupProgress, + State runState, JobPriority jp, + String user, String jobName, String queue, String jobFile, String trackingUrl) { + this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, + runState, jp, user, jobName, queue, jobFile, trackingUrl, false); + } + + /** + * Create a job status object for a given jobid. + * @param jobid The jobid of the job + * @param setupProgress The progress made on the setup + * @param mapProgress The progress made on the maps + * @param reduceProgress The progress made on the reduces + * @param cleanupProgress The progress made on the cleanup + * @param runState The current state of the job + * @param jp Priority of the job. + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param queue queue name + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. + * @param isUber Whether job running in uber mode + */ + public JobStatus(JobID jobid, float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress, + State runState, JobPriority jp, + String user, String jobName, String queue, + String jobFile, String trackingUrl, boolean isUber) { this.jobid = jobid; this.setupProgress = setupProgress; this.mapProgress = mapProgress; @@ -161,8 +186,9 @@ public class JobStatus implements Writable, Cloneable { this.jobName = jobName; this.jobFile = jobFile; this.trackingUrl = trackingUrl; + this.isUber = isUber; } - + /** * Sets the map progress of this job @@ -411,6 +437,7 @@ public class JobStatus implements Writable, Cloneable { Text.writeString(out, jobName); Text.writeString(out, trackingUrl); Text.writeString(out, jobFile); + out.writeBoolean(isUber); // Serialize the job's ACLs out.writeInt(jobACLs.size()); @@ -438,6 +465,7 @@ public class JobStatus implements Writable, Cloneable { this.jobName = Text.readString(in); this.trackingUrl = Text.readString(in); this.jobFile = Text.readString(in); + this.isUber = in.readBoolean(); // De-serialize the job's ACLs int numACLs = in.readInt(); @@ -562,9 +590,26 @@ public class JobStatus implements Writable, Cloneable { this.neededMem = n; } + /** + * Whether job running in uber mode + * @return job in uber-mode + */ + public synchronized boolean isUber() { + return isUber; + } + + /** + * Set uber-mode flag + * @param isUber Whether job running in uber-mode + */ + public synchronized void setUber(boolean isUber) { + this.isUber = isUber; + } + public String toString() { StringBuffer buffer = new StringBuffer(); buffer.append("job-id : " + jobid); + buffer.append("uber-mode : " + isUber); buffer.append("map-progress : " + mapProgress); buffer.append("reduce-progress : " + reduceProgress); buffer.append("cleanup-progress : " + cleanupProgress); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index dd189e2eafa..621787981d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -150,6 +150,10 @@ public interface MRJobConfig { public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces"; + public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params"; + + public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params"; + public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; public static final String TASK_ID = "mapreduce.task.id"; @@ -298,12 +302,6 @@ public interface MRJobConfig { "mapreduce.job.ubertask.maxreduces"; public static final String JOB_UBERTASK_MAXBYTES = "mapreduce.job.ubertask.maxbytes"; - public static final String UBERTASK_JAVA_OPTS = - "mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts? - public static final String UBERTASK_ULIMIT = - "mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit? - public static final String UBERTASK_ENV = - "mapreduce.ubertask.child.env"; // or mapreduce.uber.env? public static final String MR_PREFIX = "yarn.app.mapreduce."; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java index f18cbe33180..7121620906e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java @@ -63,17 +63,20 @@ public class TestJobMonitorAndPrint extends TestCase { when(cluster.getConf()).thenReturn(conf); when(cluster.getClient()).thenReturn(clientProtocol); JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f, - State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); + State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", + "tmp-jobfile", "tmp-url"); job = Job.getInstance(cluster, jobStatus, conf); job = spy(job); } @Test public void testJobMonitorAndPrint() throws Exception { - JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f, 0.1f, 0f, - State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); - JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f, 1f, 1f, - State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); + JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f, + 0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", + "tmp-queue", "tmp-jobfile", "tmp-url", true); + JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f, + 1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname", + "tmp-queue", "tmp-jobfile", "tmp-url", true); doAnswer( new Answer() { @@ -102,15 +105,21 @@ public class TestJobMonitorAndPrint extends TestCase { String line; boolean foundHundred = false; boolean foundComplete = false; - String match_1 = "map 100% reduce 100%"; - String match_2 = "completed successfully"; + boolean foundUber = false; + String match_1 = "uber mode : true"; + String match_2 = "map 100% reduce 100%"; + String match_3 = "completed successfully"; while ((line = r.readLine()) != null) { - foundHundred = line.contains(match_1); + if (line.contains(match_1)) { + foundUber = true; + } + foundHundred = line.contains(match_2); if (foundHundred) break; } line = r.readLine(); - foundComplete = line.contains(match_2); + foundComplete = line.contains(match_3); + assertTrue(foundUber); assertTrue(foundHundred); assertTrue(foundComplete); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index ca4ab183ad1..c1b308935a7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -107,6 +107,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter .toYarn(TypeConverter.fromYarn(jobId)).getAppId())); report.setAMInfos(getAMInfos()); + report.setIsUber(isUber()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index c388759a4b4..b17cb427d34 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -168,7 +168,7 @@ public class TestClientServiceDelegate { GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class); when(jobReportResponse1.getJobReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user", - JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null)); + JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false)); // First AM returns a report with jobName firstGen and simulates AM shutdown // on second invocation. @@ -180,7 +180,7 @@ public class TestClientServiceDelegate { GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class); when(jobReportResponse2.getJobReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user", - JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null)); + JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false)); // Second AM generation returns a report with jobName secondGen MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java index f2f87b07b60..868c2d5ae3d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java @@ -49,6 +49,7 @@ public class TestUberAM extends TestMRJobs { } @Override + @Test public void testSleepJob() throws IOException, InterruptedException, ClassNotFoundException { if (mrCluster != null) { @@ -84,6 +85,7 @@ public class TestUberAM extends TestMRJobs { } @Override + @Test public void testRandomWriter() throws IOException, InterruptedException, ClassNotFoundException { super.testRandomWriter(); @@ -101,6 +103,7 @@ public class TestUberAM extends TestMRJobs { } @Override + @Test public void testFailingMapper() throws IOException, InterruptedException, ClassNotFoundException { LOG.info("\n\n\nStarting uberized testFailingMapper().");