MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs in the correct directory to work properly in secure mode. Contributed by Hitesh Shah.

svn merge -c 1213987 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1213989 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-13 23:42:50 +00:00
parent affa6d2ee1
commit 88fe1ae2ed
22 changed files with 402 additions and 233 deletions

View File

@ -230,6 +230,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3537. Fix race condition in DefaultContainerExecutor which led
to container localization occuring in wrong directories. (acmurthy)
MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs
in the correct directory to work properly in secure mode. (Hitesh Shah via
vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -22,20 +22,19 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
@ -47,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.service.AbstractService;
/**
@ -80,7 +78,10 @@ public class LocalContainerLauncher extends AbstractService implements
super(LocalContainerLauncher.class.getName());
this.context = context;
this.umbilical = umbilical;
// umbilical: MRAppMaster creates (taskAttemptListener), passes to us (TODO/FIXME: pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar: implement umbilical protocol but skip RPC stuff)
// umbilical: MRAppMaster creates (taskAttemptListener), passes to us
// (TODO/FIXME: pointless to use RPC to talk to self; should create
// LocalTaskAttemptListener or similar: implement umbilical protocol
// but skip RPC stuff)
try {
curFC = FileContext.getFileContext(curDir.toURI());
@ -152,7 +153,6 @@ public class LocalContainerLauncher extends AbstractService implements
* ]]
* - runs Task (runSubMap() or runSubReduce())
* - TA can safely send TA_UPDATE since in RUNNING state
* [modulo possible TA-state-machine race noted below: CHECK (TODO)]
*/
private class SubtaskRunner implements Runnable {
@ -162,6 +162,7 @@ public class LocalContainerLauncher extends AbstractService implements
SubtaskRunner() {
}
@SuppressWarnings("unchecked")
@Override
public void run() {
ContainerLauncherEvent event = null;
@ -183,7 +184,7 @@ public class LocalContainerLauncher extends AbstractService implements
ContainerRemoteLaunchEvent launchEv =
(ContainerRemoteLaunchEvent)event;
TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME: can attemptID ever be null? (only if retrieved over umbilical?)
TaskAttemptId attemptID = launchEv.getTaskAttemptID();
Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
int numMapTasks = job.getTotalMaps();
@ -204,7 +205,6 @@ public class LocalContainerLauncher extends AbstractService implements
// port number is set to -1 in this case.
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(attemptID, -1));
//FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter)
if (numMapTasks == 0) {
doneWithMaps = true;
@ -259,6 +259,7 @@ public class LocalContainerLauncher extends AbstractService implements
}
}
@SuppressWarnings("deprecation")
private void runSubtask(org.apache.hadoop.mapred.Task task,
final TaskType taskType,
TaskAttemptId attemptID,
@ -270,6 +271,19 @@ public class LocalContainerLauncher extends AbstractService implements
try {
JobConf conf = new JobConf(getConfig());
conf.set(JobContext.TASK_ID, task.getTaskID().toString());
conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
conf.set(JobContext.ID, task.getJobID().toString());
// Use the AM's local dir env to generate the intermediate step
// output files
String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+ conf.get(MRConfig.LOCAL_DIR));
// mark this as an uberized subtask so it can set task counter
// (longer-term/FIXME: could redefine as job counter and send
@ -285,12 +299,12 @@ public class LocalContainerLauncher extends AbstractService implements
if (doneWithMaps) {
LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+ attemptID + "), but should be finished with maps");
// throw new RuntimeException() (FIXME: what's appropriate here?)
throw new RuntimeException();
}
MapTask map = (MapTask)task;
map.setConf(conf);
//CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
map.run(conf, umbilical);
if (renameOutputs) {
@ -305,19 +319,23 @@ public class LocalContainerLauncher extends AbstractService implements
} else /* TaskType.REDUCE */ {
if (!doneWithMaps) {
//check if event-queue empty? whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): doesn't send reduce event until maps all done]
// check if event-queue empty? whole idea of counting maps vs.
// checking event queue is a tad wacky...but could enforce ordering
// (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):
// doesn't send reduce event until maps all done]
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+ attemptID + "), but not yet finished with maps");
// throw new RuntimeException() (FIXME) // or push reduce event back onto end of queue? (probably former)
throw new RuntimeException();
}
ReduceTask reduce = (ReduceTask)task;
// a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
// set framework name to local to make task local
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
ReduceTask reduce = (ReduceTask)task;
reduce.setConf(conf);
reduce.run(conf, umbilical);
//relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
}
@ -334,18 +352,7 @@ public class LocalContainerLauncher extends AbstractService implements
try {
if (task != null) {
// do cleanup for the task
// if (childUGI == null) { // no need to job into doAs block
task.taskCleanup(umbilical);
// } else {
// final Task taskFinal = task;
// childUGI.doAs(new PrivilegedExceptionAction<Object>() {
// @Override
// public Object run() throws Exception {
// taskFinal.taskCleanup(umbilical);
// return null;
// }
// });
// }
}
} catch (Exception e) {
LOG.info("Exception cleaning up: "
@ -354,51 +361,21 @@ public class LocalContainerLauncher extends AbstractService implements
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
// if (classicAttemptID != null) {
umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
// }
throw new RuntimeException();
} catch (Throwable throwable) {
LOG.fatal("Error running local (uberized) 'child' : "
+ StringUtils.stringifyException(throwable));
// if (classicAttemptID != null) {
Throwable tCause = throwable.getCause();
String cause = (tCause == null)
? throwable.getMessage()
: StringUtils.stringifyException(tCause);
umbilical.fatalError(classicAttemptID, cause);
// }
throw new RuntimeException();
} finally {
/*
FIXME: do we need to do any of this stuff? (guessing not since not in own JVM)
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
*/
}
}
/* FIXME: may not need renameMapOutputForReduce() anymore? TEST!
${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for containers;
contains launch_container.sh script, which, when executed, creates symlinks and
sets up env
"$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
"$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
"$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done
OHO! no further need for this at all? $taskId is unique per subtask
now => should work fine to leave alone. TODO: test with teragen or
similar
*/
/**
* Within the _local_ filesystem (not HDFS), all activity takes place within
* a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
@ -409,14 +386,21 @@ sets up env
* filenames instead of "file.out". (All of this is entirely internal,
* so there are no particular compatibility issues.)
*/
@SuppressWarnings("deprecation")
private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
MapOutputFile subMapOutputFile)
throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
// move map output to reduce input
Path mapOut = subMapOutputFile.getOutputFile();
FileStatus mStatus = localFs.getFileStatus(mapOut);
Path reduceIn = subMapOutputFile.getInputFileForWrite(
TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut));
TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming map output file for task attempt "
+ mapId.toString() + " from original location " + mapOut.toString()
+ " to destination " + reduceIn.toString());
}
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
@ -429,8 +413,7 @@ sets up env
* Also within the local filesystem, we need to restore the initial state
* of the directory as much as possible. Compare current contents against
* the saved original state and nuke everything that doesn't belong, with
* the exception of the renamed map outputs (see above).
FIXME: do we really need to worry about renamed map outputs, or already moved to output dir on commit? if latter, fix comment
* the exception of the renamed map outputs.
*
* Any jobs that go out of their way to rename or delete things from the
* local directory are considered broken and deserve what they get...

View File

@ -236,6 +236,13 @@ public class MapReduceChildJVM {
getTaskLogFile(TaskLog.LogName.PROFILE)
)
);
if (task.isMapTask()) {
vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
}
else {
vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
}
}
}

View File

@ -156,6 +156,7 @@ public class MRAppMaster extends CompositeService {
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init
@ -265,8 +266,9 @@ public class MRAppMaster extends CompositeService {
addIfService(speculator);
}
speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
dispatcher.register(Speculator.EventType.class,
new SpeculatorEventDispatcher(conf));
speculatorEventDispatcher);
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(clientService, context);
@ -790,10 +792,6 @@ public class MRAppMaster extends CompositeService {
// job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent);
// send init to speculator. This won't yest start as dispatcher isn't
// started yet.
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
// JobImpl's InitTransition is done (call above is synchronous), so the
// "uber-decision" (MR-1220) has been made. Query job and switch to
@ -801,9 +799,15 @@ public class MRAppMaster extends CompositeService {
// and container-launcher services/event-handlers).
if (job.isUber()) {
speculatorEventDispatcher.disableSpeculation();
LOG.info("MRAppMaster uberizing job " + job.getID()
+ " in local container (\"uber-AM\").");
+ " in local container (\"uber-AM\") on node "
+ nmHost + ":" + nmPort + ".");
} else {
// send init to speculator only for non-uber jobs.
// This won't yet start as dispatcher isn't started yet.
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
@ -865,17 +869,24 @@ public class MRAppMaster extends CompositeService {
private class SpeculatorEventDispatcher implements
EventHandler<SpeculatorEvent> {
private final Configuration conf;
private volatile boolean disabled;
public SpeculatorEventDispatcher(Configuration config) {
this.conf = config;
}
@Override
public void handle(SpeculatorEvent event) {
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
if (!disabled &&
(conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) {
// Speculator IS enabled, direct the event to there.
speculator.handle(event);
}
}
public void disableSpeculation() {
disabled = true;
}
}
private static void validateInputParam(String value, String param)

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@ -583,13 +584,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, remoteJobConfFile.toString(), amInfos);
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
}
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
computeProgress(mapTasks), computeProgress(reduceTasks),
cleanupProgress, remoteJobConfFile.toString(), amInfos);
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} finally {
readLock.unlock();
}
@ -812,6 +813,129 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return amInfos;
}
/**
* Decide whether job can be run in uber mode based on various criteria.
* @param dataInputLength Total length for all splits
*/
private void makeUberDecision(long dataInputLength) {
//FIXME: need new memory criterion for uber-decision (oops, too late here;
// until AM-resizing supported,
// must depend on job client to pass fat-slot needs)
// these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
//FIXME: handling multiple reduces within a single AM does not seem to
//work.
// int sysMaxReduces =
// job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
int sysMaxReduces = 1;
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
// wrong; get FS from [File?]InputFormat and default block size from that
long sysMemSizeForUberSlot =
conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
boolean uberEnabled =
conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
boolean smallInput = (dataInputLength <= sysMaxBytes);
// ignoring overhead due to UberAM and statics as negligible here:
boolean smallMemory =
( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
<= sysMemSizeForUberSlot)
|| (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
boolean notChainJob = !isChainJob(conf);
// User has overall veto power over uberization, or user can modify
// limits (overriding system settings and potentially shooting
// themselves in the head). Note that ChainMapper/Reducer are
// fundamentally incompatible with MR-1220; they employ a blocking
// queue between the maps/reduces and thus require parallel execution,
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
// and thus requires sequential execution.
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && notChainJob;
if (isUber) {
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
+ numReduceTasks + "r tasks (" + dataInputLength
+ " input bytes) will run sequentially on single node.");
// make sure reduces are scheduled only after all map are completed
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1.0f);
// uber-subtask attempts all get launched on same node; if one fails,
// probably should retry elsewhere, i.e., move entire uber-AM: ergo,
// limit attempts to 1 (or at most 2? probably not...)
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
} else {
StringBuilder msg = new StringBuilder();
msg.append("Not uberizing ").append(jobId).append(" because:");
if (!uberEnabled)
msg.append(" not enabled;");
if (!smallNumMapTasks)
msg.append(" too many maps;");
if (!smallNumReduceTasks)
msg.append(" too many reduces;");
if (!smallInput)
msg.append(" too much input;");
if (!smallMemory)
msg.append(" too much RAM;");
if (!notChainJob)
msg.append(" chainjob");
LOG.info(msg.toString());
}
}
/**
* ChainMapper and ChainReducer must execute in parallel, so they're not
* compatible with uberization/LocalContainerLauncher (100% sequential).
*/
private boolean isChainJob(Configuration conf) {
boolean isChainJob = false;
try {
String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
if (mapClassName != null) {
Class<?> mapClass = Class.forName(mapClassName);
if (ChainMapper.class.isAssignableFrom(mapClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainMapper
}
try {
String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
if (reduceClassName != null) {
Class<?> reduceClass = Class.forName(reduceClassName);
if (ChainReducer.class.isAssignableFrom(reduceClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainReducer
}
return isChainJob;
}
/*
private int getBlockSize() {
String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
if (inputClassName != null) {
Class<?> inputClass - Class.forName(inputClassName);
if (FileInputFormat<K, V>)
}
}
*/
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
@ -863,80 +987,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
//FIXME: need new memory criterion for uber-decision (oops, too late here;
// until AM-resizing supported, must depend on job client to pass fat-slot needs)
// these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
int sysMaxReduces =
job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
// wrong; get FS from [File?]InputFormat and default block size from that
//long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot();
// FIXME [could use default AM-container memory size...]
boolean uberEnabled =
job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
boolean smallNumMapTasks = (job.numMapTasks <= sysMaxMaps);
boolean smallNumReduceTasks = (job.numReduceTasks <= sysMaxReduces);
boolean smallInput = (inputLength <= sysMaxBytes);
boolean smallMemory = true; //FIXME (see above)
// ignoring overhead due to UberTask and statics as negligible here:
// FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
// || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
boolean notChainJob = !isChainJob(job.conf);
// User has overall veto power over uberization, or user can modify
// limits (overriding system settings and potentially shooting
// themselves in the head). Note that ChainMapper/Reducer are
// fundamentally incompatible with MR-1220; they employ a blocking
// User has overall veto power over uberization, or user can modify
// limits (overriding system settings and potentially shooting
// themselves in the head). Note that ChainMapper/Reducer are
// fundamentally incompatible with MR-1220; they employ a blocking
// queue between the maps/reduces and thus require parallel execution,
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
// and thus requires sequential execution.
job.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && notChainJob;
if (job.isUber) {
LOG.info("Uberizing job " + job.jobId + ": " + job.numMapTasks + "m+"
+ job.numReduceTasks + "r tasks (" + inputLength
+ " input bytes) will run sequentially on single node.");
//TODO: also note which node?
// make sure reduces are scheduled only after all map are completed
job.conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1.0f);
// uber-subtask attempts all get launched on same node; if one fails,
// probably should retry elsewhere, i.e., move entire uber-AM: ergo,
// limit attempts to 1 (or at most 2? probably not...)
job.conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation: makes no sense to speculate an entire job
//canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old
//version, ultimately was from conf.getMapSpeculativeExecution(),
//conf.getReduceSpeculativeExecution()]
} else {
StringBuilder msg = new StringBuilder();
msg.append("Not uberizing ").append(job.jobId).append(" because:");
if (!uberEnabled)
msg.append(" not enabled;");
if (!smallNumMapTasks)
msg.append(" too many maps;");
if (!smallNumReduceTasks)
msg.append(" too many reduces;");
if (!smallInput)
msg.append(" too much input;");
if (!smallMemory)
msg.append(" too much RAM;");
if (!notChainJob)
msg.append(" chainjob");
LOG.info(msg.toString());
}
job.makeUberDecision(inputLength);
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
@ -1008,35 +1059,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
}
/**
* ChainMapper and ChainReducer must execute in parallel, so they're not
* compatible with uberization/LocalContainerLauncher (100% sequential).
*/
boolean isChainJob(Configuration conf) {
boolean isChainJob = false;
try {
String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
if (mapClassName != null) {
Class<?> mapClass = Class.forName(mapClassName);
if (ChainMapper.class.isAssignableFrom(mapClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainMapper
}
try {
String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
if (reduceClassName != null) {
Class<?> reduceClass = Class.forName(reduceClassName);
if (ChainReducer.class.isAssignableFrom(reduceClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainReducer
}
return isChainJob;
}
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {

View File

@ -60,8 +60,8 @@ public class LocalContainerAllocator extends RMCommunicator
private static final Log LOG =
LogFactory.getLog(LocalContainerAllocator.class);
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
// private final ApplicationId appID;
private AtomicInteger containerCount = new AtomicInteger();
private long retryInterval;
private long retrystartTime;
@ -73,8 +73,6 @@ public class LocalContainerAllocator extends RMCommunicator
AppContext context) {
super(clientService, context);
this.eventHandler = context.getEventHandler();
// this.appID = context.getApplicationID();
}
@Override
@ -88,6 +86,7 @@ public class LocalContainerAllocator extends RMCommunicator
retrystartTime = System.currentTimeMillis();
}
@SuppressWarnings("unchecked")
@Override
protected synchronized void heartbeat() throws Exception {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
@ -124,6 +123,7 @@ public class LocalContainerAllocator extends RMCommunicator
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(ContainerAllocatorEvent event) {
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -65,7 +66,7 @@ public abstract class RMCommunicator extends AbstractService {
private int rmPollInterval;//millis
protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId;
private volatile boolean stopped;
private AtomicBoolean stopped;
protected Thread allocatorThread;
protected EventHandler eventHandler;
protected AMRMProtocol scheduler;
@ -88,6 +89,7 @@ public abstract class RMCommunicator extends AbstractService {
this.eventHandler = context.getEventHandler();
this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId();
this.stopped = new AtomicBoolean(false);
}
@Override
@ -213,7 +215,10 @@ public abstract class RMCommunicator extends AbstractService {
@Override
public void stop() {
stopped = true;
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
allocatorThread.interrupt();
try {
allocatorThread.join();
@ -228,7 +233,7 @@ public abstract class RMCommunicator extends AbstractService {
allocatorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(rmPollInterval);
try {

View File

@ -183,6 +183,7 @@ public class TestMRClientService {
Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
.getAttemptId());
Assert.assertTrue(amInfo.getStartTime() > 0);
Assert.assertEquals(false, jr.isUber());
}
private void verifyTaskAttemptReport(TaskAttemptReport tar) {

View File

@ -118,7 +118,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -195,7 +195,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -261,7 +261,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -375,7 +375,7 @@ public class TestRMContainerAllocator {
public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
this.reduceProgress, this.cleanupProgress, "jobfile", null);
this.reduceProgress, this.cleanupProgress, "jobfile", null, false);
}
}
@ -511,7 +511,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@ -610,7 +610,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null));
0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);

View File

@ -288,7 +288,7 @@ public class TypeConverter {
.getMapProgress(), jobreport.getReduceProgress(), jobreport
.getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
.getJobFile(), trackingUrl);
.getJobFile(), trackingUrl, jobreport.isUber());
jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus;
}
@ -421,7 +421,7 @@ public class TypeConverter {
TypeConverter.fromYarn(application.getYarnApplicationState()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL,
application.getUser(), application.getName(),
application.getQueue(), jobFile, trackingUrl
application.getQueue(), jobFile, trackingUrl, false
);
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());

View File

@ -36,6 +36,7 @@ public interface JobReport {
public abstract String getDiagnostics();
public abstract String getJobFile();
public abstract List<AMInfo> getAMInfos();
public abstract boolean isUber();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@ -52,4 +53,5 @@ public interface JobReport {
public abstract void setDiagnostics(String diagnostics);
public abstract void setJobFile(String jobFile);
public abstract void setAMInfos(List<AMInfo> amInfos);
public abstract void setIsUber(boolean isUber);
}

View File

@ -332,4 +332,16 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
private JobState convertFromProtoFormat(JobStateProto e) {
return MRProtoUtils.convertFromProtoFormat(e);
}
@Override
public synchronized boolean isUber() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getIsUber();
}
@Override
public synchronized void setIsUber(boolean isUber) {
maybeInitBuilder();
builder.setIsUber(isUber);
}
}

View File

@ -60,7 +60,8 @@ public class MRBuilderUtils {
public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress, String jobFile, List<AMInfo> amInfos) {
float cleanupProgress, String jobFile, List<AMInfo> amInfos,
boolean isUber) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
@ -75,6 +76,7 @@ public class MRBuilderUtils {
report.setReduceProgress(reduceProgress);
report.setJobFile(jobFile);
report.setAMInfos(amInfos);
report.setIsUber(isUber);
return report;
}

View File

@ -152,6 +152,7 @@ message JobReportProto {
optional string jobFile = 13;
repeated AMInfoProto am_infos = 14;
optional int64 submit_time = 15;
optional bool is_uber = 16 [default = false];
}
message AMInfoProto {

View File

@ -135,7 +135,8 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
String user, String jobName, String jobFile,
String trackingUrl) {
this(jobid, 0.0f, mapProgress, reduceProgress,
cleanupProgress, runState, jp, user, jobName, jobFile, trackingUrl);
cleanupProgress, runState, jp, user, jobName, jobFile,
trackingUrl);
}
/**
@ -157,8 +158,30 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
int runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp,
user, jobName, "default", jobFile, trackingUrl);
runState, jp, user, jobName, "default", jobFile, trackingUrl);
}
/**
* Create a job status object for a given jobid.
* @param jobid The jobid of the job
* @param setupProgress The progress made on the setup
* @param mapProgress The progress made on the maps
* @param reduceProgress The progress made on the reduces
* @param cleanupProgress The progress made on the cleanup
* @param runState The current state of the job
* @param jp Priority of the job.
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
* @param isUber Whether job running in uber mode
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress,
int runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl, boolean isUber) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp, user, jobName, "default", jobFile, trackingUrl, isUber);
}
/**
@ -181,11 +204,36 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
int runState, JobPriority jp,
String user, String jobName, String queue,
String jobFile, String trackingUrl) {
super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()),
user, jobName, queue, jobFile, trackingUrl);
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp,
user, jobName, queue, jobFile, trackingUrl, false);
}
/**
* Create a job status object for a given jobid.
* @param jobid The jobid of the job
* @param setupProgress The progress made on the setup
* @param mapProgress The progress made on the maps
* @param reduceProgress The progress made on the reduces
* @param cleanupProgress The progress made on the cleanup
* @param runState The current state of the job
* @param jp Priority of the job.
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param queue job queue name.
* @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
* @param isUber Whether job running in uber mode
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress,
int runState, JobPriority jp,
String user, String jobName, String queue,
String jobFile, String trackingUrl, boolean isUber) {
super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()),
user, jobName, queue, jobFile, trackingUrl, isUber);
}
public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){
JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()),
@ -193,7 +241,7 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
stat.getCleanupProgress(), stat.getState().getValue(),
JobPriority.valueOf(stat.getPriority().name()),
stat.getUsername(), stat.getJobName(), stat.getJobFile(),
stat.getTrackingUrl());
stat.getTrackingUrl(), stat.isUber());
old.setStartTime(stat.getStartTime());
old.setFinishTime(stat.getFinishTime());
old.setSchedulingInfo(stat.getSchedulingInfo());

View File

@ -467,6 +467,7 @@ public class Job extends JobContextImpl implements JobContext {
sb.append("Job File: ").append(status.getJobFile()).append("\n");
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n");
sb.append("Uber job : ").append(status.isUber()).append("\n");
sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: ");
@ -1268,12 +1269,20 @@ public class Job extends JobContextImpl implements JobContext {
Job.getProgressPollInterval(clientConf);
/* make sure to report full progress after the job is done */
boolean reportedAfterCompletion = false;
boolean reportedUberMode = false;
while (!isComplete() || !reportedAfterCompletion) {
if (isComplete()) {
reportedAfterCompletion = true;
} else {
Thread.sleep(progMonitorPollIntervalMillis);
}
if (status.getState() == JobStatus.State.PREP) {
continue;
}
if (!reportedUberMode) {
reportedUberMode = true;
LOG.info("Job " + jobId + " running in uber mode : " + isUber());
}
String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " +
@ -1497,4 +1506,10 @@ public class Job extends JobContextImpl implements JobContext {
conf.set(Job.OUTPUT_FILTER, newValue.toString());
}
public boolean isUber() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.isUber();
}
}

View File

@ -97,7 +97,7 @@ public class JobStatus implements Writable, Cloneable {
private int usedMem;
private int reservedMem;
private int neededMem;
private boolean isUber;
/**
*/
@ -123,7 +123,7 @@ public class JobStatus implements Writable, Cloneable {
State runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp, user, jobName, "default", jobFile, trackingUrl);
runState, jp, user, jobName, "default", jobFile, trackingUrl, false);
}
/**
@ -146,6 +146,31 @@ public class JobStatus implements Writable, Cloneable {
State runState, JobPriority jp,
String user, String jobName, String queue,
String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
runState, jp, user, jobName, queue, jobFile, trackingUrl, false);
}
/**
* Create a job status object for a given jobid.
* @param jobid The jobid of the job
* @param setupProgress The progress made on the setup
* @param mapProgress The progress made on the maps
* @param reduceProgress The progress made on the reduces
* @param cleanupProgress The progress made on the cleanup
* @param runState The current state of the job
* @param jp Priority of the job.
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param queue queue name
* @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
* @param isUber Whether job running in uber mode
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress,
State runState, JobPriority jp,
String user, String jobName, String queue,
String jobFile, String trackingUrl, boolean isUber) {
this.jobid = jobid;
this.setupProgress = setupProgress;
this.mapProgress = mapProgress;
@ -161,6 +186,7 @@ public class JobStatus implements Writable, Cloneable {
this.jobName = jobName;
this.jobFile = jobFile;
this.trackingUrl = trackingUrl;
this.isUber = isUber;
}
@ -411,6 +437,7 @@ public class JobStatus implements Writable, Cloneable {
Text.writeString(out, jobName);
Text.writeString(out, trackingUrl);
Text.writeString(out, jobFile);
out.writeBoolean(isUber);
// Serialize the job's ACLs
out.writeInt(jobACLs.size());
@ -438,6 +465,7 @@ public class JobStatus implements Writable, Cloneable {
this.jobName = Text.readString(in);
this.trackingUrl = Text.readString(in);
this.jobFile = Text.readString(in);
this.isUber = in.readBoolean();
// De-serialize the job's ACLs
int numACLs = in.readInt();
@ -562,9 +590,26 @@ public class JobStatus implements Writable, Cloneable {
this.neededMem = n;
}
/**
* Whether job running in uber mode
* @return job in uber-mode
*/
public synchronized boolean isUber() {
return isUber;
}
/**
* Set uber-mode flag
* @param isUber Whether job running in uber-mode
*/
public synchronized void setUber(boolean isUber) {
this.isUber = isUber;
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("job-id : " + jobid);
buffer.append("uber-mode : " + isUber);
buffer.append("map-progress : " + mapProgress);
buffer.append("reduce-progress : " + reduceProgress);
buffer.append("cleanup-progress : " + cleanupProgress);

View File

@ -150,6 +150,10 @@ public interface MRJobConfig {
public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces";
public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params";
public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params";
public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
public static final String TASK_ID = "mapreduce.task.id";
@ -298,12 +302,6 @@ public interface MRJobConfig {
"mapreduce.job.ubertask.maxreduces";
public static final String JOB_UBERTASK_MAXBYTES =
"mapreduce.job.ubertask.maxbytes";
public static final String UBERTASK_JAVA_OPTS =
"mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts?
public static final String UBERTASK_ULIMIT =
"mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit?
public static final String UBERTASK_ENV =
"mapreduce.ubertask.child.env"; // or mapreduce.uber.env?
public static final String MR_PREFIX = "yarn.app.mapreduce.";

View File

@ -63,17 +63,20 @@ public class TestJobMonitorAndPrint extends TestCase {
when(cluster.getConf()).thenReturn(conf);
when(cluster.getClient()).thenReturn(clientProtocol);
JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-jobfile", "tmp-url");
job = Job.getInstance(cluster, jobStatus, conf);
job = spy(job);
}
@Test
public void testJobMonitorAndPrint() throws Exception {
JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f, 0.1f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f, 1f, 1f,
State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
doAnswer(
new Answer<TaskCompletionEvent[]>() {
@ -102,15 +105,21 @@ public class TestJobMonitorAndPrint extends TestCase {
String line;
boolean foundHundred = false;
boolean foundComplete = false;
String match_1 = "map 100% reduce 100%";
String match_2 = "completed successfully";
boolean foundUber = false;
String match_1 = "uber mode : true";
String match_2 = "map 100% reduce 100%";
String match_3 = "completed successfully";
while ((line = r.readLine()) != null) {
foundHundred = line.contains(match_1);
if (line.contains(match_1)) {
foundUber = true;
}
foundHundred = line.contains(match_2);
if (foundHundred)
break;
}
line = r.readLine();
foundComplete = line.contains(match_2);
foundComplete = line.contains(match_3);
assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
}

View File

@ -107,6 +107,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
.toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
report.setAMInfos(getAMInfos());
report.setIsUber(isUber());
}
@Override

View File

@ -168,7 +168,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
when(jobReportResponse1.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
// First AM returns a report with jobName firstGen and simulates AM shutdown
// on second invocation.
@ -180,7 +180,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
when(jobReportResponse2.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
// Second AM generation returns a report with jobName secondGen
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);

View File

@ -49,6 +49,7 @@ public class TestUberAM extends TestMRJobs {
}
@Override
@Test
public void testSleepJob()
throws IOException, InterruptedException, ClassNotFoundException {
if (mrCluster != null) {
@ -84,6 +85,7 @@ public class TestUberAM extends TestMRJobs {
}
@Override
@Test
public void testRandomWriter()
throws IOException, InterruptedException, ClassNotFoundException {
super.testRandomWriter();
@ -101,6 +103,7 @@ public class TestUberAM extends TestMRJobs {
}
@Override
@Test
public void testFailingMapper()
throws IOException, InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting uberized testFailingMapper().");