diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7f43ec1c0bb..96c39bf6fa2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -23,6 +23,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current attempt is the last retry. (Wangda Tan via zjshen) + MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader + enabled if custom output format/committer is used (Sangjin Lee via jlowe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index b6090343a15..79e024388f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; @@ -198,6 +197,7 @@ public class MRAppMaster extends CompositeService { new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; + private ClassLoader jobClassLoader; private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; @@ -247,6 +247,9 @@ public class MRAppMaster extends CompositeService { @Override protected void serviceInit(final Configuration conf) throws Exception { + // create the job classloader if enabled + createJobClassLoader(conf); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); initJobCredentialsAndUGI(conf); @@ -446,33 +449,37 @@ public class MRAppMaster extends CompositeService { } private OutputCommitter createOutputCommitter(Configuration conf) { - OutputCommitter committer = null; + return callWithJobClassLoader(conf, new Action() { + public OutputCommitter call(Configuration conf) { + OutputCommitter committer = null; - LOG.info("OutputCommitter set in config " - + conf.get("mapred.output.committer.class")); + LOG.info("OutputCommitter set in config " + + conf.get("mapred.output.committer.class")); - if (newApiCommitter) { - org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils - .newTaskId(jobId, 0, TaskType.MAP); - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils - .newTaskAttemptId(taskID, 0); - TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, - TypeConverter.fromYarn(attemptID)); - OutputFormat outputFormat; - try { - outputFormat = ReflectionUtils.newInstance(taskContext - .getOutputFormatClass(), conf); - committer = outputFormat.getOutputCommitter(taskContext); - } catch (Exception e) { - throw new YarnRuntimeException(e); + if (newApiCommitter) { + org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = + MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = + MRBuilderUtils.newTaskAttemptId(taskID, 0); + TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptID)); + OutputFormat outputFormat; + try { + outputFormat = ReflectionUtils.newInstance(taskContext + .getOutputFormatClass(), conf); + committer = outputFormat.getOutputCommitter(taskContext); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + committer = ReflectionUtils.newInstance(conf.getClass( + "mapred.output.committer.class", FileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class), conf); + } + LOG.info("OutputCommitter is " + committer.getClass().getName()); + return committer; } - } else { - committer = ReflectionUtils.newInstance(conf.getClass( - "mapred.output.committer.class", FileOutputCommitter.class, - org.apache.hadoop.mapred.OutputCommitter.class), conf); - } - LOG.info("OutputCommitter is " + committer.getClass().getName()); - return committer; + }); } protected boolean keepJobFiles(JobConf conf) { @@ -654,38 +661,42 @@ public class MRAppMaster extends CompositeService { return new StagingDirCleaningService(); } - protected Speculator createSpeculator(Configuration conf, AppContext context) { - Class speculatorClass; + protected Speculator createSpeculator(Configuration conf, + final AppContext context) { + return callWithJobClassLoader(conf, new Action() { + public Speculator call(Configuration conf) { + Class speculatorClass; + try { + speculatorClass + // "yarn.mapreduce.job.speculator.class" + = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, + DefaultSpeculator.class, + Speculator.class); + Constructor speculatorConstructor + = speculatorClass.getConstructor + (Configuration.class, AppContext.class); + Speculator result = speculatorConstructor.newInstance(conf, context); - try { - speculatorClass - // "yarn.mapreduce.job.speculator.class" - = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, - DefaultSpeculator.class, - Speculator.class); - Constructor speculatorConstructor - = speculatorClass.getConstructor - (Configuration.class, AppContext.class); - Speculator result = speculatorConstructor.newInstance(conf, context); - - return result; - } catch (InstantiationException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (IllegalAccessException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (InvocationTargetException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (NoSuchMethodException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } + return result; + } catch (InstantiationException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (IllegalAccessException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (InvocationTargetException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (NoSuchMethodException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } + } + }); } protected TaskAttemptListener createTaskAttemptListener(AppContext context) { @@ -698,7 +709,7 @@ public class MRAppMaster extends CompositeService { protected EventHandler createCommitterEventHandler( AppContext context, OutputCommitter committer) { return new CommitterEventHandler(context, committer, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), jobClassLoader); } protected ContainerAllocator createContainerAllocator( @@ -1069,8 +1080,8 @@ public class MRAppMaster extends CompositeService { //start all the components super.serviceStart(); - // set job classloader if configured - MRApps.setJobClassLoader(getConfig()); + // finally set the job classloader + MRApps.setClassLoader(jobClassLoader, getConfig()); if (initFailed) { JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); @@ -1087,19 +1098,24 @@ public class MRAppMaster extends CompositeService { TaskLog.syncLogsShutdown(logSyncer); } - private boolean isRecoverySupported(OutputCommitter committer2) - throws IOException { + private boolean isRecoverySupported() throws IOException { boolean isSupported = false; - JobContext _jobContext; + Configuration conf = getConfig(); if (committer != null) { + final JobContext _jobContext; if (newApiCommitter) { _jobContext = new JobContextImpl( - getConfig(), TypeConverter.fromYarn(getJobId())); + conf, TypeConverter.fromYarn(getJobId())); } else { _jobContext = new org.apache.hadoop.mapred.JobContextImpl( - new JobConf(getConfig()), TypeConverter.fromYarn(getJobId())); + new JobConf(conf), TypeConverter.fromYarn(getJobId())); } - isSupported = committer.isRecoverySupported(_jobContext); + isSupported = callWithJobClassLoader(conf, + new ExceptionAction() { + public Boolean call(Configuration conf) throws IOException { + return committer.isRecoverySupported(_jobContext); + } + }); } return isSupported; } @@ -1113,7 +1129,7 @@ public class MRAppMaster extends CompositeService { MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); - boolean recoverySupportedByCommitter = isRecoverySupported(committer); + boolean recoverySupportedByCommitter = isRecoverySupported(); // If a shuffle secret was not provided by the job client then this app // attempt will generate one. However that disables recovery if there @@ -1298,7 +1314,7 @@ public class MRAppMaster extends CompositeService { this.conf = config; } @Override - public void handle(SpeculatorEvent event) { + public void handle(final SpeculatorEvent event) { if (disabled) { return; } @@ -1325,7 +1341,12 @@ public class MRAppMaster extends CompositeService { if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { // Speculator IS enabled, direct the event to there. - speculator.handle(event); + callWithJobClassLoader(conf, new Action() { + public Void call(Configuration conf) { + speculator.handle(event); + return null; + } + }); } } @@ -1479,6 +1500,102 @@ public class MRAppMaster extends CompositeService { }); } + /** + * Creates a job classloader based on the configuration if the job classloader + * is enabled. It is a no-op if the job classloader is not enabled. + */ + private void createJobClassLoader(Configuration conf) throws IOException { + jobClassLoader = MRApps.createJobClassLoader(conf); + } + + /** + * Executes the given action with the job classloader set as the configuration + * classloader as well as the thread context class loader if the job + * classloader is enabled. After the call, the original classloader is + * restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + */ + T callWithJobClassLoader(Configuration conf, Action action) { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Executes the given action that can throw a checked exception with the job + * classloader set as the configuration classloader as well as the thread + * context class loader if the job classloader is enabled. After the call, the + * original classloader is restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + * @throws IOException if the underlying action throws an IOException + * @throws YarnRuntimeException if the underlying action throws an exception + * other than an IOException + */ + T callWithJobClassLoader(Configuration conf, ExceptionAction action) + throws IOException { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } catch (IOException e) { + throw e; + } catch (YarnRuntimeException e) { + throw e; + } catch (Exception e) { + // wrap it with a YarnRuntimeException + throw new YarnRuntimeException(e); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Action to be wrapped with setting and unsetting the job classloader + */ + private static interface Action { + T call(Configuration conf); + } + + private static interface ExceptionAction { + T call(Configuration conf) throws Exception; + } + @Override protected void serviceStop() throws Exception { super.serviceStop(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java index f15bce23577..8c3be86cb11 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java @@ -68,6 +68,7 @@ public class CommitterEventHandler extends AbstractService private BlockingQueue eventQueue = new LinkedBlockingQueue(); private final AtomicBoolean stopped; + private final ClassLoader jobClassLoader; private Thread jobCommitThread = null; private int commitThreadCancelTimeoutMs; private long commitWindowMs; @@ -79,11 +80,17 @@ public class CommitterEventHandler extends AbstractService public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler) { + this(context, committer, rmHeartbeatHandler, null); + } + + public CommitterEventHandler(AppContext context, OutputCommitter committer, + RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) { super("CommitterEventHandler"); this.context = context; this.committer = committer; this.rmHeartbeatHandler = rmHeartbeatHandler; this.stopped = new AtomicBoolean(false); + this.jobClassLoader = jobClassLoader; } @Override @@ -109,9 +116,23 @@ public class CommitterEventHandler extends AbstractService @Override protected void serviceStart() throws Exception { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("CommitterEvent Processor #%d") - .build(); + ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder() + .setNameFormat("CommitterEvent Processor #%d"); + if (jobClassLoader != null) { + // if the job classloader is enabled, we need to use the job classloader + // as the thread context classloader (TCCL) of these threads in case the + // committer needs to load another class via TCCL + ThreadFactory backingTf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setContextClassLoader(jobClassLoader); + return thread; + } + }; + tfBuilder.setThreadFactory(backingTf); + } + ThreadFactory tf = tfBuilder.build(); launcherPool = new ThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); eventHandlingThread = new Thread(new Runnable() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index d72a1bff269..301cdd50be2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -327,8 +327,8 @@ public class MRApps extends Apps { } /** - * Sets a {@link ApplicationClassLoader} on the given configuration and as - * the context classloader, if + * Creates and sets a {@link ApplicationClassLoader} on the given + * configuration and as the thread context classloader, if * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and * the APP_CLASSPATH environment variable is set. * @param conf @@ -336,24 +336,52 @@ public class MRApps extends Apps { */ public static void setJobClassLoader(Configuration conf) throws IOException { + setClassLoader(createJobClassLoader(conf), conf); + } + + /** + * Creates a {@link ApplicationClassLoader} if + * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and + * the APP_CLASSPATH environment variable is set. + * @param conf + * @returns the created job classloader, or null if the job classloader is not + * enabled or the APP_CLASSPATH environment variable is not set + * @throws IOException + */ + public static ClassLoader createJobClassLoader(Configuration conf) + throws IOException { + ClassLoader jobClassLoader = null; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); if (appClasspath == null) { - LOG.warn("Not using job classloader since APP_CLASSPATH is not set."); + LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); } else { - LOG.info("Using job classloader"); + LOG.info("Creating job classloader"); if (LOG.isDebugEnabled()) { LOG.debug("APP_CLASSPATH=" + appClasspath); } String[] systemClasses = getSystemClasses(conf); - ClassLoader jobClassLoader = createJobClassLoader(appClasspath, + jobClassLoader = createJobClassLoader(appClasspath, systemClasses); - if (jobClassLoader != null) { - conf.setClassLoader(jobClassLoader); - Thread.currentThread().setContextClassLoader(jobClassLoader); - } } } + return jobClassLoader; + } + + /** + * Sets the provided classloader on the given configuration and as the thread + * context classloader if the classloader is not null. + * @param classLoader + * @param conf + */ + public static void setClassLoader(ClassLoader classLoader, + Configuration conf) { + if (classLoader != null) { + LOG.info("Setting classloader " + classLoader.getClass().getName() + + " on the configuration and as the thread context classloader"); + conf.setClassLoader(classLoader); + Thread.currentThread().setContextClassLoader(classLoader); + } } @VisibleForTesting diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 2027d37b362..6b47554e8eb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -33,8 +33,8 @@ import java.util.HashMap; import java.util.Map; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; -import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.FailingMapper; @@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; +import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; +import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.ApplicationClassLoader; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.junit.AfterClass; @@ -210,7 +215,19 @@ public class TestMRJobs { @Test(timeout = 300000) public void testJobClassloader() throws IOException, InterruptedException, ClassNotFoundException { - LOG.info("\n\n\nStarting testJobClassloader()."); + testJobClassloader(false); + } + + @Test(timeout = 300000) + public void testJobClassloaderWithCustomClasses() throws IOException, + InterruptedException, ClassNotFoundException { + testJobClassloader(true); + } + + private void testJobClassloader(boolean useCustomClasses) throws IOException, + InterruptedException, ClassNotFoundException { + LOG.info("\n\n\nStarting testJobClassloader()" + + " useCustomClasses=" + useCustomClasses); if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR @@ -221,6 +238,19 @@ public class TestMRJobs { // set master address to local to test that local mode applied iff framework == local sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); + if (useCustomClasses) { + // to test AM loading user classes such as output format class, we want + // to blacklist them from the system classes (they need to be prepended + // as the first match wins) + String systemClasses = + sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); + // exclude the custom classes from system classes + systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" + + CustomSpeculator.class.getName() + "," + + systemClasses; + sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES, + systemClasses); + } sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB); sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); @@ -233,12 +263,66 @@ public class TestMRJobs { job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.setJarByClass(SleepJob.class); job.setMaxMapAttempts(1); // speed up failures + if (useCustomClasses) { + // set custom output format class and speculator class + job.setOutputFormatClass(CustomOutputFormat.class); + final Configuration jobConf = job.getConfiguration(); + jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class, + Speculator.class); + // speculation needs to be enabled for the speculator to be loaded + jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true); + } job.submit(); boolean succeeded = job.waitForCompletion(true); Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(), succeeded); } + public static class CustomOutputFormat extends NullOutputFormat { + public CustomOutputFormat() { + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + + public static class CustomSpeculator extends DefaultSpeculator { + public CustomSpeculator(Configuration conf, AppContext context) { + super(conf, context); + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters();