MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs in the correct directory to work properly in secure mode. Contributed by Hitesh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213987 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-13 23:35:11 +00:00
parent 3954a2fb1c
commit b7ae5a6cb7
22 changed files with 402 additions and 233 deletions

View File

@ -282,6 +282,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3542. Support "FileSystemCounter" legacy counter group name for MAPREDUCE-3542. Support "FileSystemCounter" legacy counter group name for
compatibility. (tomwhite) compatibility. (tomwhite)
MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs
in the correct directory to work properly in secure mode. (Hitesh Shah via
vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -22,20 +22,19 @@ import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.URI;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
@ -47,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
/** /**
@ -80,7 +78,10 @@ public class LocalContainerLauncher extends AbstractService implements
super(LocalContainerLauncher.class.getName()); super(LocalContainerLauncher.class.getName());
this.context = context; this.context = context;
this.umbilical = umbilical; this.umbilical = umbilical;
// umbilical: MRAppMaster creates (taskAttemptListener), passes to us (TODO/FIXME: pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar: implement umbilical protocol but skip RPC stuff) // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
// (TODO/FIXME: pointless to use RPC to talk to self; should create
// LocalTaskAttemptListener or similar: implement umbilical protocol
// but skip RPC stuff)
try { try {
curFC = FileContext.getFileContext(curDir.toURI()); curFC = FileContext.getFileContext(curDir.toURI());
@ -152,7 +153,6 @@ public class LocalContainerLauncher extends AbstractService implements
* ]] * ]]
* - runs Task (runSubMap() or runSubReduce()) * - runs Task (runSubMap() or runSubReduce())
* - TA can safely send TA_UPDATE since in RUNNING state * - TA can safely send TA_UPDATE since in RUNNING state
* [modulo possible TA-state-machine race noted below: CHECK (TODO)]
*/ */
private class SubtaskRunner implements Runnable { private class SubtaskRunner implements Runnable {
@ -162,6 +162,7 @@ public class LocalContainerLauncher extends AbstractService implements
SubtaskRunner() { SubtaskRunner() {
} }
@SuppressWarnings("unchecked")
@Override @Override
public void run() { public void run() {
ContainerLauncherEvent event = null; ContainerLauncherEvent event = null;
@ -183,7 +184,7 @@ public class LocalContainerLauncher extends AbstractService implements
ContainerRemoteLaunchEvent launchEv = ContainerRemoteLaunchEvent launchEv =
(ContainerRemoteLaunchEvent)event; (ContainerRemoteLaunchEvent)event;
TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME: can attemptID ever be null? (only if retrieved over umbilical?) TaskAttemptId attemptID = launchEv.getTaskAttemptID();
Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
int numMapTasks = job.getTotalMaps(); int numMapTasks = job.getTotalMaps();
@ -204,7 +205,6 @@ public class LocalContainerLauncher extends AbstractService implements
// port number is set to -1 in this case. // port number is set to -1 in this case.
context.getEventHandler().handle( context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(attemptID, -1)); new TaskAttemptContainerLaunchedEvent(attemptID, -1));
//FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter)
if (numMapTasks == 0) { if (numMapTasks == 0) {
doneWithMaps = true; doneWithMaps = true;
@ -259,6 +259,7 @@ public class LocalContainerLauncher extends AbstractService implements
} }
} }
@SuppressWarnings("deprecation")
private void runSubtask(org.apache.hadoop.mapred.Task task, private void runSubtask(org.apache.hadoop.mapred.Task task,
final TaskType taskType, final TaskType taskType,
TaskAttemptId attemptID, TaskAttemptId attemptID,
@ -270,6 +271,19 @@ public class LocalContainerLauncher extends AbstractService implements
try { try {
JobConf conf = new JobConf(getConfig()); JobConf conf = new JobConf(getConfig());
conf.set(JobContext.TASK_ID, task.getTaskID().toString());
conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
conf.set(JobContext.ID, task.getJobID().toString());
// Use the AM's local dir env to generate the intermediate step
// output files
String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+ conf.get(MRConfig.LOCAL_DIR));
// mark this as an uberized subtask so it can set task counter // mark this as an uberized subtask so it can set task counter
// (longer-term/FIXME: could redefine as job counter and send // (longer-term/FIXME: could redefine as job counter and send
@ -285,12 +299,12 @@ public class LocalContainerLauncher extends AbstractService implements
if (doneWithMaps) { if (doneWithMaps) {
LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task (" LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+ attemptID + "), but should be finished with maps"); + attemptID + "), but should be finished with maps");
// throw new RuntimeException() (FIXME: what's appropriate here?) throw new RuntimeException();
} }
MapTask map = (MapTask)task; MapTask map = (MapTask)task;
map.setConf(conf);
//CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
map.run(conf, umbilical); map.run(conf, umbilical);
if (renameOutputs) { if (renameOutputs) {
@ -305,19 +319,23 @@ public class LocalContainerLauncher extends AbstractService implements
} else /* TaskType.REDUCE */ { } else /* TaskType.REDUCE */ {
if (!doneWithMaps) { if (!doneWithMaps) {
//check if event-queue empty? whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): doesn't send reduce event until maps all done] // check if event-queue empty? whole idea of counting maps vs.
// checking event queue is a tad wacky...but could enforce ordering
// (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):
// doesn't send reduce event until maps all done]
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task (" LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+ attemptID + "), but not yet finished with maps"); + attemptID + "), but not yet finished with maps");
// throw new RuntimeException() (FIXME) // or push reduce event back onto end of queue? (probably former) throw new RuntimeException();
} }
ReduceTask reduce = (ReduceTask)task;
// a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner: // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
// set framework name to local to make task local // set framework name to local to make task local
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
ReduceTask reduce = (ReduceTask)task;
reduce.setConf(conf);
reduce.run(conf, umbilical); reduce.run(conf, umbilical);
//relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?) //relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
} }
@ -334,18 +352,7 @@ public class LocalContainerLauncher extends AbstractService implements
try { try {
if (task != null) { if (task != null) {
// do cleanup for the task // do cleanup for the task
// if (childUGI == null) { // no need to job into doAs block
task.taskCleanup(umbilical); task.taskCleanup(umbilical);
// } else {
// final Task taskFinal = task;
// childUGI.doAs(new PrivilegedExceptionAction<Object>() {
// @Override
// public Object run() throws Exception {
// taskFinal.taskCleanup(umbilical);
// return null;
// }
// });
// }
} }
} catch (Exception e) { } catch (Exception e) {
LOG.info("Exception cleaning up: " LOG.info("Exception cleaning up: "
@ -354,51 +361,21 @@ public class LocalContainerLauncher extends AbstractService implements
// Report back any failures, for diagnostic purposes // Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos)); exception.printStackTrace(new PrintStream(baos));
// if (classicAttemptID != null) {
umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString()); umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
// }
throw new RuntimeException(); throw new RuntimeException();
} catch (Throwable throwable) { } catch (Throwable throwable) {
LOG.fatal("Error running local (uberized) 'child' : " LOG.fatal("Error running local (uberized) 'child' : "
+ StringUtils.stringifyException(throwable)); + StringUtils.stringifyException(throwable));
// if (classicAttemptID != null) {
Throwable tCause = throwable.getCause(); Throwable tCause = throwable.getCause();
String cause = (tCause == null) String cause = (tCause == null)
? throwable.getMessage() ? throwable.getMessage()
: StringUtils.stringifyException(tCause); : StringUtils.stringifyException(tCause);
umbilical.fatalError(classicAttemptID, cause); umbilical.fatalError(classicAttemptID, cause);
// }
throw new RuntimeException(); throw new RuntimeException();
} finally {
/*
FIXME: do we need to do any of this stuff? (guessing not since not in own JVM)
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
*/
} }
} }
/* FIXME: may not need renameMapOutputForReduce() anymore? TEST!
${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for containers;
contains launch_container.sh script, which, when executed, creates symlinks and
sets up env
"$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
"$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
"$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done
OHO! no further need for this at all? $taskId is unique per subtask
now => should work fine to leave alone. TODO: test with teragen or
similar
*/
/** /**
* Within the _local_ filesystem (not HDFS), all activity takes place within * Within the _local_ filesystem (not HDFS), all activity takes place within
* a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/), * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
@ -409,14 +386,21 @@ sets up env
* filenames instead of "file.out". (All of this is entirely internal, * filenames instead of "file.out". (All of this is entirely internal,
* so there are no particular compatibility issues.) * so there are no particular compatibility issues.)
*/ */
@SuppressWarnings("deprecation")
private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
MapOutputFile subMapOutputFile) MapOutputFile subMapOutputFile)
throws IOException { throws IOException {
FileSystem localFs = FileSystem.getLocal(conf); FileSystem localFs = FileSystem.getLocal(conf);
// move map output to reduce input // move map output to reduce input
Path mapOut = subMapOutputFile.getOutputFile(); Path mapOut = subMapOutputFile.getOutputFile();
FileStatus mStatus = localFs.getFileStatus(mapOut);
Path reduceIn = subMapOutputFile.getInputFileForWrite( Path reduceIn = subMapOutputFile.getInputFileForWrite(
TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut)); TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming map output file for task attempt "
+ mapId.toString() + " from original location " + mapOut.toString()
+ " to destination " + reduceIn.toString());
}
if (!localFs.mkdirs(reduceIn.getParent())) { if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create " throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString()); + reduceIn.getParent().toString());
@ -429,8 +413,7 @@ sets up env
* Also within the local filesystem, we need to restore the initial state * Also within the local filesystem, we need to restore the initial state
* of the directory as much as possible. Compare current contents against * of the directory as much as possible. Compare current contents against
* the saved original state and nuke everything that doesn't belong, with * the saved original state and nuke everything that doesn't belong, with
* the exception of the renamed map outputs (see above). * the exception of the renamed map outputs.
FIXME: do we really need to worry about renamed map outputs, or already moved to output dir on commit? if latter, fix comment
* *
* Any jobs that go out of their way to rename or delete things from the * Any jobs that go out of their way to rename or delete things from the
* local directory are considered broken and deserve what they get... * local directory are considered broken and deserve what they get...

View File

@ -236,6 +236,13 @@ public class MapReduceChildJVM {
getTaskLogFile(TaskLog.LogName.PROFILE) getTaskLogFile(TaskLog.LogName.PROFILE)
) )
); );
if (task.isMapTask()) {
vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
}
else {
vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
}
} }
} }

View File

@ -156,6 +156,7 @@ public class MRAppMaster extends CompositeService {
private OutputCommitter committer; private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher; private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false; private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private Job job; private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init private Credentials fsTokens = new Credentials(); // Filled during init
@ -265,8 +266,9 @@ public class MRAppMaster extends CompositeService {
addIfService(speculator); addIfService(speculator);
} }
speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
dispatcher.register(Speculator.EventType.class, dispatcher.register(Speculator.EventType.class,
new SpeculatorEventDispatcher(conf)); speculatorEventDispatcher);
// service to allocate containers from RM (if non-uber) or to fake it (uber) // service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(clientService, context); containerAllocator = createContainerAllocator(clientService, context);
@ -790,10 +792,6 @@ public class MRAppMaster extends CompositeService {
// job-init to be done completely here. // job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent); jobEventDispatcher.handle(initJobEvent);
// send init to speculator. This won't yest start as dispatcher isn't
// started yet.
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
// JobImpl's InitTransition is done (call above is synchronous), so the // JobImpl's InitTransition is done (call above is synchronous), so the
// "uber-decision" (MR-1220) has been made. Query job and switch to // "uber-decision" (MR-1220) has been made. Query job and switch to
@ -801,9 +799,15 @@ public class MRAppMaster extends CompositeService {
// and container-launcher services/event-handlers). // and container-launcher services/event-handlers).
if (job.isUber()) { if (job.isUber()) {
speculatorEventDispatcher.disableSpeculation();
LOG.info("MRAppMaster uberizing job " + job.getID() LOG.info("MRAppMaster uberizing job " + job.getID()
+ " in local container (\"uber-AM\")."); + " in local container (\"uber-AM\") on node "
+ nmHost + ":" + nmPort + ".");
} else { } else {
// send init to speculator only for non-uber jobs.
// This won't yet start as dispatcher isn't started yet.
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
LOG.info("MRAppMaster launching normal, non-uberized, multi-container " LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + "."); + "job " + job.getID() + ".");
} }
@ -865,17 +869,24 @@ public class MRAppMaster extends CompositeService {
private class SpeculatorEventDispatcher implements private class SpeculatorEventDispatcher implements
EventHandler<SpeculatorEvent> { EventHandler<SpeculatorEvent> {
private final Configuration conf; private final Configuration conf;
private volatile boolean disabled;
public SpeculatorEventDispatcher(Configuration config) { public SpeculatorEventDispatcher(Configuration config) {
this.conf = config; this.conf = config;
} }
@Override @Override
public void handle(SpeculatorEvent event) { public void handle(SpeculatorEvent event) {
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) if (!disabled &&
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) {
// Speculator IS enabled, direct the event to there. // Speculator IS enabled, direct the event to there.
speculator.handle(event); speculator.handle(event);
} }
} }
public void disableSpeculation() {
disabled = true;
}
} }
private static void validateInputParam(String value, String param) private static void validateInputParam(String value, String param)

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@ -583,13 +584,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (getState() == JobState.NEW) { if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state, return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, remoteJobConfFile.toString(), amInfos); cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} }
return MRBuilderUtils.newJobReport(jobId, jobName, username, state, return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, appSubmitTime, startTime, finishTime, setupProgress,
computeProgress(mapTasks), computeProgress(reduceTasks), computeProgress(mapTasks), computeProgress(reduceTasks),
cleanupProgress, remoteJobConfFile.toString(), amInfos); cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -812,6 +813,129 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return amInfos; return amInfos;
} }
/**
* Decide whether job can be run in uber mode based on various criteria.
* @param dataInputLength Total length for all splits
*/
private void makeUberDecision(long dataInputLength) {
//FIXME: need new memory criterion for uber-decision (oops, too late here;
// until AM-resizing supported,
// must depend on job client to pass fat-slot needs)
// these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
//FIXME: handling multiple reduces within a single AM does not seem to
//work.
// int sysMaxReduces =
// job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
int sysMaxReduces = 1;
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
// wrong; get FS from [File?]InputFormat and default block size from that
long sysMemSizeForUberSlot =
conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
boolean uberEnabled =
conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
boolean smallInput = (dataInputLength <= sysMaxBytes);
// ignoring overhead due to UberAM and statics as negligible here:
boolean smallMemory =
( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
<= sysMemSizeForUberSlot)
|| (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
boolean notChainJob = !isChainJob(conf);
// User has overall veto power over uberization, or user can modify
// limits (overriding system settings and potentially shooting
// themselves in the head). Note that ChainMapper/Reducer are
// fundamentally incompatible with MR-1220; they employ a blocking
// queue between the maps/reduces and thus require parallel execution,
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
// and thus requires sequential execution.
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && notChainJob;
if (isUber) {
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
+ numReduceTasks + "r tasks (" + dataInputLength
+ " input bytes) will run sequentially on single node.");
// make sure reduces are scheduled only after all map are completed
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1.0f);
// uber-subtask attempts all get launched on same node; if one fails,
// probably should retry elsewhere, i.e., move entire uber-AM: ergo,
// limit attempts to 1 (or at most 2? probably not...)
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
} else {
StringBuilder msg = new StringBuilder();
msg.append("Not uberizing ").append(jobId).append(" because:");
if (!uberEnabled)
msg.append(" not enabled;");
if (!smallNumMapTasks)
msg.append(" too many maps;");
if (!smallNumReduceTasks)
msg.append(" too many reduces;");
if (!smallInput)
msg.append(" too much input;");
if (!smallMemory)
msg.append(" too much RAM;");
if (!notChainJob)
msg.append(" chainjob");
LOG.info(msg.toString());
}
}
/**
* ChainMapper and ChainReducer must execute in parallel, so they're not
* compatible with uberization/LocalContainerLauncher (100% sequential).
*/
private boolean isChainJob(Configuration conf) {
boolean isChainJob = false;
try {
String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
if (mapClassName != null) {
Class<?> mapClass = Class.forName(mapClassName);
if (ChainMapper.class.isAssignableFrom(mapClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainMapper
}
try {
String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
if (reduceClassName != null) {
Class<?> reduceClass = Class.forName(reduceClassName);
if (ChainReducer.class.isAssignableFrom(reduceClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainReducer
}
return isChainJob;
}
/*
private int getBlockSize() {
String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
if (inputClassName != null) {
Class<?> inputClass - Class.forName(inputClassName);
if (FileInputFormat<K, V>)
}
}
*/
public static class InitTransition public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> { implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
@ -863,80 +987,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
inputLength += taskSplitMetaInfo[i].getInputDataLength(); inputLength += taskSplitMetaInfo[i].getInputDataLength();
} }
//FIXME: need new memory criterion for uber-decision (oops, too late here; job.makeUberDecision(inputLength);
// until AM-resizing supported, must depend on job client to pass fat-slot needs)
// these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
int sysMaxReduces =
job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
// wrong; get FS from [File?]InputFormat and default block size from that
//long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot();
// FIXME [could use default AM-container memory size...]
boolean uberEnabled =
job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
boolean smallNumMapTasks = (job.numMapTasks <= sysMaxMaps);
boolean smallNumReduceTasks = (job.numReduceTasks <= sysMaxReduces);
boolean smallInput = (inputLength <= sysMaxBytes);
boolean smallMemory = true; //FIXME (see above)
// ignoring overhead due to UberTask and statics as negligible here:
// FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
// || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
boolean notChainJob = !isChainJob(job.conf);
// User has overall veto power over uberization, or user can modify
// limits (overriding system settings and potentially shooting
// themselves in the head). Note that ChainMapper/Reducer are
// fundamentally incompatible with MR-1220; they employ a blocking
// User has overall veto power over uberization, or user can modify
// limits (overriding system settings and potentially shooting
// themselves in the head). Note that ChainMapper/Reducer are
// fundamentally incompatible with MR-1220; they employ a blocking
// queue between the maps/reduces and thus require parallel execution,
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
// and thus requires sequential execution.
job.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && notChainJob;
if (job.isUber) {
LOG.info("Uberizing job " + job.jobId + ": " + job.numMapTasks + "m+"
+ job.numReduceTasks + "r tasks (" + inputLength
+ " input bytes) will run sequentially on single node.");
//TODO: also note which node?
// make sure reduces are scheduled only after all map are completed
job.conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1.0f);
// uber-subtask attempts all get launched on same node; if one fails,
// probably should retry elsewhere, i.e., move entire uber-AM: ergo,
// limit attempts to 1 (or at most 2? probably not...)
job.conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation: makes no sense to speculate an entire job
//canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old
//version, ultimately was from conf.getMapSpeculativeExecution(),
//conf.getReduceSpeculativeExecution()]
} else {
StringBuilder msg = new StringBuilder();
msg.append("Not uberizing ").append(job.jobId).append(" because:");
if (!uberEnabled)
msg.append(" not enabled;");
if (!smallNumMapTasks)
msg.append(" too many maps;");
if (!smallNumReduceTasks)
msg.append(" too many reduces;");
if (!smallInput)
msg.append(" too much input;");
if (!smallMemory)
msg.append(" too much RAM;");
if (!notChainJob)
msg.append(" chainjob");
LOG.info(msg.toString());
}
job.taskAttemptCompletionEvents = job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>( new ArrayList<TaskAttemptCompletionEvent>(
@ -1008,35 +1059,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
} }
} }
/**
* ChainMapper and ChainReducer must execute in parallel, so they're not
* compatible with uberization/LocalContainerLauncher (100% sequential).
*/
boolean isChainJob(Configuration conf) {
boolean isChainJob = false;
try {
String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
if (mapClassName != null) {
Class<?> mapClass = Class.forName(mapClassName);
if (ChainMapper.class.isAssignableFrom(mapClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainMapper
}
try {
String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
if (reduceClassName != null) {
Class<?> reduceClass = Class.forName(reduceClassName);
if (ChainReducer.class.isAssignableFrom(reduceClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainReducer
}
return isChainJob;
}
private void createMapTasks(JobImpl job, long inputLength, private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) { TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) { for (int i=0; i < job.numMapTasks; ++i) {

View File

@ -60,8 +60,8 @@ public class LocalContainerAllocator extends RMCommunicator
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(LocalContainerAllocator.class); LogFactory.getLog(LocalContainerAllocator.class);
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler; private final EventHandler eventHandler;
// private final ApplicationId appID;
private AtomicInteger containerCount = new AtomicInteger(); private AtomicInteger containerCount = new AtomicInteger();
private long retryInterval; private long retryInterval;
private long retrystartTime; private long retrystartTime;
@ -73,8 +73,6 @@ public class LocalContainerAllocator extends RMCommunicator
AppContext context) { AppContext context) {
super(clientService, context); super(clientService, context);
this.eventHandler = context.getEventHandler(); this.eventHandler = context.getEventHandler();
// this.appID = context.getApplicationID();
} }
@Override @Override
@ -88,6 +86,7 @@ public class LocalContainerAllocator extends RMCommunicator
retrystartTime = System.currentTimeMillis(); retrystartTime = System.currentTimeMillis();
} }
@SuppressWarnings("unchecked")
@Override @Override
protected synchronized void heartbeat() throws Exception { protected synchronized void heartbeat() throws Exception {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
@ -124,6 +123,7 @@ public class LocalContainerAllocator extends RMCommunicator
} }
} }
@SuppressWarnings("unchecked")
@Override @Override
public void handle(ContainerAllocatorEvent event) { public void handle(ContainerAllocatorEvent event) {
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -65,7 +66,7 @@ public abstract class RMCommunicator extends AbstractService {
private int rmPollInterval;//millis private int rmPollInterval;//millis
protected ApplicationId applicationId; protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId; protected ApplicationAttemptId applicationAttemptId;
private volatile boolean stopped; private AtomicBoolean stopped;
protected Thread allocatorThread; protected Thread allocatorThread;
protected EventHandler eventHandler; protected EventHandler eventHandler;
protected AMRMProtocol scheduler; protected AMRMProtocol scheduler;
@ -88,6 +89,7 @@ public abstract class RMCommunicator extends AbstractService {
this.eventHandler = context.getEventHandler(); this.eventHandler = context.getEventHandler();
this.applicationId = context.getApplicationID(); this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId(); this.applicationAttemptId = context.getApplicationAttemptId();
this.stopped = new AtomicBoolean(false);
} }
@Override @Override
@ -213,7 +215,10 @@ public abstract class RMCommunicator extends AbstractService {
@Override @Override
public void stop() { public void stop() {
stopped = true; if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
allocatorThread.interrupt(); allocatorThread.interrupt();
try { try {
allocatorThread.join(); allocatorThread.join();
@ -228,7 +233,7 @@ public abstract class RMCommunicator extends AbstractService {
allocatorThread = new Thread(new Runnable() { allocatorThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try { try {
Thread.sleep(rmPollInterval); Thread.sleep(rmPollInterval);
try { try {

View File

@ -183,6 +183,7 @@ public class TestMRClientService {
Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId() Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
.getAttemptId()); .getAttemptId());
Assert.assertTrue(amInfo.getStartTime() > 0); Assert.assertTrue(amInfo.getStartTime() > 0);
Assert.assertEquals(false, jr.isUber());
} }
private void verifyTaskAttemptReport(TaskAttemptReport tar) { private void verifyTaskAttemptReport(TaskAttemptReport tar) {

View File

@ -118,7 +118,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null)); 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);
@ -195,7 +195,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null)); 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);
@ -261,7 +261,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null)); 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);
@ -375,7 +375,7 @@ public class TestRMContainerAllocator {
public JobReport getReport() { public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user", return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress, JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
this.reduceProgress, this.cleanupProgress, "jobfile", null); this.reduceProgress, this.cleanupProgress, "jobfile", null, false);
} }
} }
@ -511,7 +511,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null)); 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);
@ -610,7 +610,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null)); 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);

View File

@ -288,7 +288,7 @@ public class TypeConverter {
.getMapProgress(), jobreport.getReduceProgress(), jobreport .getMapProgress(), jobreport.getReduceProgress(), jobreport
.getCleanupProgress(), fromYarn(jobreport.getJobState()), .getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
.getJobFile(), trackingUrl); .getJobFile(), trackingUrl, jobreport.isUber());
jobStatus.setFailureInfo(jobreport.getDiagnostics()); jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus; return jobStatus;
} }
@ -421,7 +421,7 @@ public class TypeConverter {
TypeConverter.fromYarn(application.getYarnApplicationState()), TypeConverter.fromYarn(application.getYarnApplicationState()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL, org.apache.hadoop.mapreduce.JobPriority.NORMAL,
application.getUser(), application.getName(), application.getUser(), application.getName(),
application.getQueue(), jobFile, trackingUrl application.getQueue(), jobFile, trackingUrl, false
); );
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime()); jobStatus.setStartTime(application.getStartTime());

View File

@ -36,6 +36,7 @@ public interface JobReport {
public abstract String getDiagnostics(); public abstract String getDiagnostics();
public abstract String getJobFile(); public abstract String getJobFile();
public abstract List<AMInfo> getAMInfos(); public abstract List<AMInfo> getAMInfos();
public abstract boolean isUber();
public abstract void setJobId(JobId jobId); public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState); public abstract void setJobState(JobState jobState);
@ -52,4 +53,5 @@ public interface JobReport {
public abstract void setDiagnostics(String diagnostics); public abstract void setDiagnostics(String diagnostics);
public abstract void setJobFile(String jobFile); public abstract void setJobFile(String jobFile);
public abstract void setAMInfos(List<AMInfo> amInfos); public abstract void setAMInfos(List<AMInfo> amInfos);
public abstract void setIsUber(boolean isUber);
} }

View File

@ -332,4 +332,16 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
private JobState convertFromProtoFormat(JobStateProto e) { private JobState convertFromProtoFormat(JobStateProto e) {
return MRProtoUtils.convertFromProtoFormat(e); return MRProtoUtils.convertFromProtoFormat(e);
} }
@Override
public synchronized boolean isUber() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getIsUber();
}
@Override
public synchronized void setIsUber(boolean isUber) {
maybeInitBuilder();
builder.setIsUber(isUber);
}
} }

View File

@ -60,7 +60,8 @@ public class MRBuilderUtils {
public static JobReport newJobReport(JobId jobId, String jobName, public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime, long finishTime, String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress, float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress, String jobFile, List<AMInfo> amInfos) { float cleanupProgress, String jobFile, List<AMInfo> amInfos,
boolean isUber) {
JobReport report = Records.newRecord(JobReport.class); JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId); report.setJobId(jobId);
report.setJobName(jobName); report.setJobName(jobName);
@ -75,6 +76,7 @@ public class MRBuilderUtils {
report.setReduceProgress(reduceProgress); report.setReduceProgress(reduceProgress);
report.setJobFile(jobFile); report.setJobFile(jobFile);
report.setAMInfos(amInfos); report.setAMInfos(amInfos);
report.setIsUber(isUber);
return report; return report;
} }

View File

@ -152,6 +152,7 @@ message JobReportProto {
optional string jobFile = 13; optional string jobFile = 13;
repeated AMInfoProto am_infos = 14; repeated AMInfoProto am_infos = 14;
optional int64 submit_time = 15; optional int64 submit_time = 15;
optional bool is_uber = 16 [default = false];
} }
message AMInfoProto { message AMInfoProto {

View File

@ -135,7 +135,8 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
String user, String jobName, String jobFile, String user, String jobName, String jobFile,
String trackingUrl) { String trackingUrl) {
this(jobid, 0.0f, mapProgress, reduceProgress, this(jobid, 0.0f, mapProgress, reduceProgress,
cleanupProgress, runState, jp, user, jobName, jobFile, trackingUrl); cleanupProgress, runState, jp, user, jobName, jobFile,
trackingUrl);
} }
/** /**
@ -157,8 +158,30 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
int runState, JobPriority jp, String user, String jobName, int runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl) { String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp, runState, jp, user, jobName, "default", jobFile, trackingUrl);
user, jobName, "default", jobFile, trackingUrl); }
/**
* Create a job status object for a given jobid.
* @param jobid The jobid of the job
* @param setupProgress The progress made on the setup
* @param mapProgress The progress made on the maps
* @param reduceProgress The progress made on the reduces
* @param cleanupProgress The progress made on the cleanup
* @param runState The current state of the job
* @param jp Priority of the job.
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
* @param isUber Whether job running in uber mode
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress,
int runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl, boolean isUber) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp, user, jobName, "default", jobFile, trackingUrl, isUber);
} }
/** /**
@ -181,11 +204,36 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
int runState, JobPriority jp, int runState, JobPriority jp,
String user, String jobName, String queue, String user, String jobName, String queue,
String jobFile, String trackingUrl) { String jobFile, String trackingUrl) {
super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()), runState, jp,
user, jobName, queue, jobFile, trackingUrl); user, jobName, queue, jobFile, trackingUrl, false);
} }
/**
* Create a job status object for a given jobid.
* @param jobid The jobid of the job
* @param setupProgress The progress made on the setup
* @param mapProgress The progress made on the maps
* @param reduceProgress The progress made on the reduces
* @param cleanupProgress The progress made on the cleanup
* @param runState The current state of the job
* @param jp Priority of the job.
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param queue job queue name.
* @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
* @param isUber Whether job running in uber mode
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress,
int runState, JobPriority jp,
String user, String jobName, String queue,
String jobFile, String trackingUrl, boolean isUber) {
super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()),
user, jobName, queue, jobFile, trackingUrl, isUber);
}
public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){ public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){
JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()), JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()),
@ -193,7 +241,7 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
stat.getCleanupProgress(), stat.getState().getValue(), stat.getCleanupProgress(), stat.getState().getValue(),
JobPriority.valueOf(stat.getPriority().name()), JobPriority.valueOf(stat.getPriority().name()),
stat.getUsername(), stat.getJobName(), stat.getJobFile(), stat.getUsername(), stat.getJobName(), stat.getJobFile(),
stat.getTrackingUrl()); stat.getTrackingUrl(), stat.isUber());
old.setStartTime(stat.getStartTime()); old.setStartTime(stat.getStartTime());
old.setFinishTime(stat.getFinishTime()); old.setFinishTime(stat.getFinishTime());
old.setSchedulingInfo(stat.getSchedulingInfo()); old.setSchedulingInfo(stat.getSchedulingInfo());

View File

@ -467,6 +467,7 @@ public class Job extends JobContextImpl implements JobContext {
sb.append("Job File: ").append(status.getJobFile()).append("\n"); sb.append("Job File: ").append(status.getJobFile()).append("\n");
sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n"); sb.append("\n");
sb.append("Uber job : ").append(status.isUber()).append("\n");
sb.append("map() completion: "); sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n"); sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: "); sb.append("reduce() completion: ");
@ -1268,12 +1269,20 @@ public class Job extends JobContextImpl implements JobContext {
Job.getProgressPollInterval(clientConf); Job.getProgressPollInterval(clientConf);
/* make sure to report full progress after the job is done */ /* make sure to report full progress after the job is done */
boolean reportedAfterCompletion = false; boolean reportedAfterCompletion = false;
boolean reportedUberMode = false;
while (!isComplete() || !reportedAfterCompletion) { while (!isComplete() || !reportedAfterCompletion) {
if (isComplete()) { if (isComplete()) {
reportedAfterCompletion = true; reportedAfterCompletion = true;
} else { } else {
Thread.sleep(progMonitorPollIntervalMillis); Thread.sleep(progMonitorPollIntervalMillis);
} }
if (status.getState() == JobStatus.State.PREP) {
continue;
}
if (!reportedUberMode) {
reportedUberMode = true;
LOG.info("Job " + jobId + " running in uber mode : " + isUber());
}
String report = String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+ (" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " + " reduce " +
@ -1497,4 +1506,10 @@ public class Job extends JobContextImpl implements JobContext {
conf.set(Job.OUTPUT_FILTER, newValue.toString()); conf.set(Job.OUTPUT_FILTER, newValue.toString());
} }
public boolean isUber() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.isUber();
}
} }

View File

@ -97,7 +97,7 @@ public class JobStatus implements Writable, Cloneable {
private int usedMem; private int usedMem;
private int reservedMem; private int reservedMem;
private int neededMem; private int neededMem;
private boolean isUber;
/** /**
*/ */
@ -123,7 +123,7 @@ public class JobStatus implements Writable, Cloneable {
State runState, JobPriority jp, String user, String jobName, State runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl) { String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp, user, jobName, "default", jobFile, trackingUrl); runState, jp, user, jobName, "default", jobFile, trackingUrl, false);
} }
/** /**
@ -146,6 +146,31 @@ public class JobStatus implements Writable, Cloneable {
State runState, JobPriority jp, State runState, JobPriority jp,
String user, String jobName, String queue, String user, String jobName, String queue,
String jobFile, String trackingUrl) { String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp, user, jobName, queue, jobFile, trackingUrl, false);
}
/**
* Create a job status object for a given jobid.
* @param jobid The jobid of the job
* @param setupProgress The progress made on the setup
* @param mapProgress The progress made on the maps
* @param reduceProgress The progress made on the reduces
* @param cleanupProgress The progress made on the cleanup
* @param runState The current state of the job
* @param jp Priority of the job.
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param queue queue name
* @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
* @param isUber Whether job running in uber mode
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress,
State runState, JobPriority jp,
String user, String jobName, String queue,
String jobFile, String trackingUrl, boolean isUber) {
this.jobid = jobid; this.jobid = jobid;
this.setupProgress = setupProgress; this.setupProgress = setupProgress;
this.mapProgress = mapProgress; this.mapProgress = mapProgress;
@ -161,6 +186,7 @@ public class JobStatus implements Writable, Cloneable {
this.jobName = jobName; this.jobName = jobName;
this.jobFile = jobFile; this.jobFile = jobFile;
this.trackingUrl = trackingUrl; this.trackingUrl = trackingUrl;
this.isUber = isUber;
} }
@ -411,6 +437,7 @@ public class JobStatus implements Writable, Cloneable {
Text.writeString(out, jobName); Text.writeString(out, jobName);
Text.writeString(out, trackingUrl); Text.writeString(out, trackingUrl);
Text.writeString(out, jobFile); Text.writeString(out, jobFile);
out.writeBoolean(isUber);
// Serialize the job's ACLs // Serialize the job's ACLs
out.writeInt(jobACLs.size()); out.writeInt(jobACLs.size());
@ -438,6 +465,7 @@ public class JobStatus implements Writable, Cloneable {
this.jobName = Text.readString(in); this.jobName = Text.readString(in);
this.trackingUrl = Text.readString(in); this.trackingUrl = Text.readString(in);
this.jobFile = Text.readString(in); this.jobFile = Text.readString(in);
this.isUber = in.readBoolean();
// De-serialize the job's ACLs // De-serialize the job's ACLs
int numACLs = in.readInt(); int numACLs = in.readInt();
@ -562,9 +590,26 @@ public class JobStatus implements Writable, Cloneable {
this.neededMem = n; this.neededMem = n;
} }
/**
* Whether job running in uber mode
* @return job in uber-mode
*/
public synchronized boolean isUber() {
return isUber;
}
/**
* Set uber-mode flag
* @param isUber Whether job running in uber-mode
*/
public synchronized void setUber(boolean isUber) {
this.isUber = isUber;
}
public String toString() { public String toString() {
StringBuffer buffer = new StringBuffer(); StringBuffer buffer = new StringBuffer();
buffer.append("job-id : " + jobid); buffer.append("job-id : " + jobid);
buffer.append("uber-mode : " + isUber);
buffer.append("map-progress : " + mapProgress); buffer.append("map-progress : " + mapProgress);
buffer.append("reduce-progress : " + reduceProgress); buffer.append("reduce-progress : " + reduceProgress);
buffer.append("cleanup-progress : " + cleanupProgress); buffer.append("cleanup-progress : " + cleanupProgress);

View File

@ -150,6 +150,10 @@ public interface MRJobConfig {
public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces"; public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces";
public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params";
public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params";
public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
public static final String TASK_ID = "mapreduce.task.id"; public static final String TASK_ID = "mapreduce.task.id";
@ -298,12 +302,6 @@ public interface MRJobConfig {
"mapreduce.job.ubertask.maxreduces"; "mapreduce.job.ubertask.maxreduces";
public static final String JOB_UBERTASK_MAXBYTES = public static final String JOB_UBERTASK_MAXBYTES =
"mapreduce.job.ubertask.maxbytes"; "mapreduce.job.ubertask.maxbytes";
public static final String UBERTASK_JAVA_OPTS =
"mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts?
public static final String UBERTASK_ULIMIT =
"mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit?
public static final String UBERTASK_ENV =
"mapreduce.ubertask.child.env"; // or mapreduce.uber.env?
public static final String MR_PREFIX = "yarn.app.mapreduce."; public static final String MR_PREFIX = "yarn.app.mapreduce.";

View File

@ -63,17 +63,20 @@ public class TestJobMonitorAndPrint extends TestCase {
when(cluster.getConf()).thenReturn(conf); when(cluster.getConf()).thenReturn(conf);
when(cluster.getClient()).thenReturn(clientProtocol); when(cluster.getClient()).thenReturn(clientProtocol);
JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f, JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-jobfile", "tmp-url");
job = Job.getInstance(cluster, jobStatus, conf); job = Job.getInstance(cluster, jobStatus, conf);
job = spy(job); job = spy(job);
} }
@Test @Test
public void testJobMonitorAndPrint() throws Exception { public void testJobMonitorAndPrint() throws Exception {
JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f, 0.1f, 0f, JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); 0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f, 1f, 1f, "tmp-queue", "tmp-jobfile", "tmp-url", true);
State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
doAnswer( doAnswer(
new Answer<TaskCompletionEvent[]>() { new Answer<TaskCompletionEvent[]>() {
@ -102,15 +105,21 @@ public class TestJobMonitorAndPrint extends TestCase {
String line; String line;
boolean foundHundred = false; boolean foundHundred = false;
boolean foundComplete = false; boolean foundComplete = false;
String match_1 = "map 100% reduce 100%"; boolean foundUber = false;
String match_2 = "completed successfully"; String match_1 = "uber mode : true";
String match_2 = "map 100% reduce 100%";
String match_3 = "completed successfully";
while ((line = r.readLine()) != null) { while ((line = r.readLine()) != null) {
foundHundred = line.contains(match_1); if (line.contains(match_1)) {
foundUber = true;
}
foundHundred = line.contains(match_2);
if (foundHundred) if (foundHundred)
break; break;
} }
line = r.readLine(); line = r.readLine();
foundComplete = line.contains(match_2); foundComplete = line.contains(match_3);
assertTrue(foundUber);
assertTrue(foundHundred); assertTrue(foundHundred);
assertTrue(foundComplete); assertTrue(foundComplete);
} }

View File

@ -107,6 +107,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
.toYarn(TypeConverter.fromYarn(jobId)).getAppId())); .toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
report.setAMInfos(getAMInfos()); report.setAMInfos(getAMInfos());
report.setIsUber(isUber());
} }
@Override @Override

View File

@ -168,7 +168,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class); GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
when(jobReportResponse1.getJobReport()).thenReturn( when(jobReportResponse1.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user", MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null)); JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
// First AM returns a report with jobName firstGen and simulates AM shutdown // First AM returns a report with jobName firstGen and simulates AM shutdown
// on second invocation. // on second invocation.
@ -180,7 +180,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class); GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
when(jobReportResponse2.getJobReport()).thenReturn( when(jobReportResponse2.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user", MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null)); JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
// Second AM generation returns a report with jobName secondGen // Second AM generation returns a report with jobName secondGen
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class); MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);

View File

@ -49,6 +49,7 @@ public class TestUberAM extends TestMRJobs {
} }
@Override @Override
@Test
public void testSleepJob() public void testSleepJob()
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
if (mrCluster != null) { if (mrCluster != null) {
@ -84,6 +85,7 @@ public class TestUberAM extends TestMRJobs {
} }
@Override @Override
@Test
public void testRandomWriter() public void testRandomWriter()
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
super.testRandomWriter(); super.testRandomWriter();
@ -101,6 +103,7 @@ public class TestUberAM extends TestMRJobs {
} }
@Override @Override
@Test
public void testFailingMapper() public void testFailingMapper()
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting uberized testFailingMapper()."); LOG.info("\n\n\nStarting uberized testFailingMapper().");