MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader enabled if custom output format/committer is used. Contributed by Sangjin Lee

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612358 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-07-21 17:54:46 +00:00
parent 8a87085820
commit 9e62bcca4e
5 changed files with 342 additions and 84 deletions

View File

@ -169,6 +169,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
attempt is the last retry. (Wangda Tan via zjshen)
MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
enabled if custom output format/committer is used (Sangjin Lee via jlowe)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
@ -200,6 +199,7 @@ public class MRAppMaster extends CompositeService {
new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
private ClassLoader jobClassLoader;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
@ -250,6 +250,9 @@ public class MRAppMaster extends CompositeService {
@Override
protected void serviceInit(final Configuration conf) throws Exception {
// create the job classloader if enabled
createJobClassLoader(conf);
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
initJobCredentialsAndUGI(conf);
@ -387,8 +390,13 @@ public class MRAppMaster extends CompositeService {
addIfService(committerEventHandler);
//policy handling preemption requests from RM
callWithJobClassLoader(conf, new Action<Void>() {
public Void call(Configuration conf) {
preemptionPolicy = createPreemptionPolicy(conf);
preemptionPolicy.init(context);
return null;
}
});
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
@ -453,16 +461,18 @@ public class MRAppMaster extends CompositeService {
}
private OutputCommitter createOutputCommitter(Configuration conf) {
return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
public OutputCommitter call(Configuration conf) {
OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
.newTaskId(jobId, 0, TaskType.MAP);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
.newTaskAttemptId(taskID, 0);
org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
MRBuilderUtils.newTaskAttemptId(taskID, 0);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptID));
OutputFormat outputFormat;
@ -481,6 +491,8 @@ public class MRAppMaster extends CompositeService {
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
});
}
protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
return ReflectionUtils.newInstance(conf.getClass(
@ -667,9 +679,11 @@ public class MRAppMaster extends CompositeService {
return new StagingDirCleaningService();
}
protected Speculator createSpeculator(Configuration conf, AppContext context) {
protected Speculator createSpeculator(Configuration conf,
final AppContext context) {
return callWithJobClassLoader(conf, new Action<Speculator>() {
public Speculator call(Configuration conf) {
Class<? extends Speculator> speculatorClass;
try {
speculatorClass
// "yarn.mapreduce.job.speculator.class"
@ -700,6 +714,8 @@ public class MRAppMaster extends CompositeService {
throw new YarnRuntimeException(ex);
}
}
});
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
AMPreemptionPolicy preemptionPolicy) {
@ -712,7 +728,7 @@ public class MRAppMaster extends CompositeService {
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer,
getRMHeartbeatHandler());
getRMHeartbeatHandler(), jobClassLoader);
}
protected ContainerAllocator createContainerAllocator(
@ -1083,8 +1099,8 @@ public class MRAppMaster extends CompositeService {
//start all the components
super.serviceStart();
// set job classloader if configured
MRApps.setJobClassLoader(getConfig());
// finally set the job classloader
MRApps.setClassLoader(jobClassLoader, getConfig());
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
@ -1101,19 +1117,24 @@ public class MRAppMaster extends CompositeService {
TaskLog.syncLogsShutdown(logSyncer);
}
private boolean isRecoverySupported(OutputCommitter committer2)
throws IOException {
private boolean isRecoverySupported() throws IOException {
boolean isSupported = false;
JobContext _jobContext;
Configuration conf = getConfig();
if (committer != null) {
final JobContext _jobContext;
if (newApiCommitter) {
_jobContext = new JobContextImpl(
getConfig(), TypeConverter.fromYarn(getJobId()));
conf, TypeConverter.fromYarn(getJobId()));
} else {
_jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(getConfig()), TypeConverter.fromYarn(getJobId()));
new JobConf(conf), TypeConverter.fromYarn(getJobId()));
}
isSupported = committer.isRecoverySupported(_jobContext);
isSupported = callWithJobClassLoader(conf,
new ExceptionAction<Boolean>() {
public Boolean call(Configuration conf) throws IOException {
return committer.isRecoverySupported(_jobContext);
}
});
}
return isSupported;
}
@ -1127,7 +1148,7 @@ public class MRAppMaster extends CompositeService {
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
boolean recoverySupportedByCommitter = isRecoverySupported(committer);
boolean recoverySupportedByCommitter = isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
@ -1312,7 +1333,7 @@ public class MRAppMaster extends CompositeService {
this.conf = config;
}
@Override
public void handle(SpeculatorEvent event) {
public void handle(final SpeculatorEvent event) {
if (disabled) {
return;
}
@ -1339,7 +1360,12 @@ public class MRAppMaster extends CompositeService {
if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
|| (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
// Speculator IS enabled, direct the event to there.
callWithJobClassLoader(conf, new Action<Void>() {
public Void call(Configuration conf) {
speculator.handle(event);
return null;
}
});
}
}
@ -1499,6 +1525,102 @@ public class MRAppMaster extends CompositeService {
});
}
/**
* Creates a job classloader based on the configuration if the job classloader
* is enabled. It is a no-op if the job classloader is not enabled.
*/
private void createJobClassLoader(Configuration conf) throws IOException {
jobClassLoader = MRApps.createJobClassLoader(conf);
}
/**
* Executes the given action with the job classloader set as the configuration
* classloader as well as the thread context class loader if the job
* classloader is enabled. After the call, the original classloader is
* restored.
*
* If the job classloader is enabled and the code needs to load user-supplied
* classes via configuration or thread context classloader, this method should
* be used in order to load them.
*
* @param conf the configuration on which the classloader will be set
* @param action the callable action to be executed
*/
<T> T callWithJobClassLoader(Configuration conf, Action<T> action) {
// if the job classloader is enabled, we may need it to load the (custom)
// classes; we make the job classloader available and unset it once it is
// done
ClassLoader currentClassLoader = conf.getClassLoader();
boolean setJobClassLoader =
jobClassLoader != null && currentClassLoader != jobClassLoader;
if (setJobClassLoader) {
MRApps.setClassLoader(jobClassLoader, conf);
}
try {
return action.call(conf);
} finally {
if (setJobClassLoader) {
// restore the original classloader
MRApps.setClassLoader(currentClassLoader, conf);
}
}
}
/**
* Executes the given action that can throw a checked exception with the job
* classloader set as the configuration classloader as well as the thread
* context class loader if the job classloader is enabled. After the call, the
* original classloader is restored.
*
* If the job classloader is enabled and the code needs to load user-supplied
* classes via configuration or thread context classloader, this method should
* be used in order to load them.
*
* @param conf the configuration on which the classloader will be set
* @param action the callable action to be executed
* @throws IOException if the underlying action throws an IOException
* @throws YarnRuntimeException if the underlying action throws an exception
* other than an IOException
*/
<T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action)
throws IOException {
// if the job classloader is enabled, we may need it to load the (custom)
// classes; we make the job classloader available and unset it once it is
// done
ClassLoader currentClassLoader = conf.getClassLoader();
boolean setJobClassLoader =
jobClassLoader != null && currentClassLoader != jobClassLoader;
if (setJobClassLoader) {
MRApps.setClassLoader(jobClassLoader, conf);
}
try {
return action.call(conf);
} catch (IOException e) {
throw e;
} catch (YarnRuntimeException e) {
throw e;
} catch (Exception e) {
// wrap it with a YarnRuntimeException
throw new YarnRuntimeException(e);
} finally {
if (setJobClassLoader) {
// restore the original classloader
MRApps.setClassLoader(currentClassLoader, conf);
}
}
}
/**
* Action to be wrapped with setting and unsetting the job classloader
*/
private static interface Action<T> {
T call(Configuration conf);
}
private static interface ExceptionAction<T> {
T call(Configuration conf) throws Exception;
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();

View File

@ -68,6 +68,7 @@ public class CommitterEventHandler extends AbstractService
private BlockingQueue<CommitterEvent> eventQueue =
new LinkedBlockingQueue<CommitterEvent>();
private final AtomicBoolean stopped;
private final ClassLoader jobClassLoader;
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
private long commitWindowMs;
@ -79,11 +80,17 @@ public class CommitterEventHandler extends AbstractService
public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) {
this(context, committer, rmHeartbeatHandler, null);
}
public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
super("CommitterEventHandler");
this.context = context;
this.committer = committer;
this.rmHeartbeatHandler = rmHeartbeatHandler;
this.stopped = new AtomicBoolean(false);
this.jobClassLoader = jobClassLoader;
}
@Override
@ -109,9 +116,23 @@ public class CommitterEventHandler extends AbstractService
@Override
protected void serviceStart() throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("CommitterEvent Processor #%d")
.build();
ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder()
.setNameFormat("CommitterEvent Processor #%d");
if (jobClassLoader != null) {
// if the job classloader is enabled, we need to use the job classloader
// as the thread context classloader (TCCL) of these threads in case the
// committer needs to load another class via TCCL
ThreadFactory backingTf = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setContextClassLoader(jobClassLoader);
return thread;
}
};
tfBuilder.setThreadFactory(backingTf);
}
ThreadFactory tf = tfBuilder.build();
launcherPool = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {

View File

@ -327,8 +327,8 @@ public class MRApps extends Apps {
}
/**
* Sets a {@link ApplicationClassLoader} on the given configuration and as
* the context classloader, if
* Creates and sets a {@link ApplicationClassLoader} on the given
* configuration and as the thread context classloader, if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
@ -336,23 +336,51 @@ public class MRApps extends Apps {
*/
public static void setJobClassLoader(Configuration conf)
throws IOException {
setClassLoader(createJobClassLoader(conf), conf);
}
/**
* Creates a {@link ApplicationClassLoader} if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
* @returns the created job classloader, or null if the job classloader is not
* enabled or the APP_CLASSPATH environment variable is not set
* @throws IOException
*/
public static ClassLoader createJobClassLoader(Configuration conf)
throws IOException {
ClassLoader jobClassLoader = null;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
if (appClasspath == null) {
LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
} else {
LOG.info("Using job classloader");
LOG.info("Creating job classloader");
if (LOG.isDebugEnabled()) {
LOG.debug("APP_CLASSPATH=" + appClasspath);
}
String[] systemClasses = getSystemClasses(conf);
ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
jobClassLoader = createJobClassLoader(appClasspath,
systemClasses);
if (jobClassLoader != null) {
conf.setClassLoader(jobClassLoader);
Thread.currentThread().setContextClassLoader(jobClassLoader);
}
}
return jobClassLoader;
}
/**
* Sets the provided classloader on the given configuration and as the thread
* context classloader if the classloader is not null.
* @param classLoader
* @param conf
*/
public static void setClassLoader(ClassLoader classLoader,
Configuration conf) {
if (classLoader != null) {
LOG.info("Setting classloader " + classLoader.getClass().getName() +
" on the configuration and as the thread context classloader");
conf.setClassLoader(classLoader);
Thread.currentThread().setContextClassLoader(classLoader);
}
}

View File

@ -33,8 +33,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.FailingMapper;
@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
@ -210,7 +215,19 @@ public class TestMRJobs {
@Test(timeout = 300000)
public void testJobClassloader() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testJobClassloader().");
testJobClassloader(false);
}
@Test(timeout = 300000)
public void testJobClassloaderWithCustomClasses() throws IOException,
InterruptedException, ClassNotFoundException {
testJobClassloader(true);
}
private void testJobClassloader(boolean useCustomClasses) throws IOException,
InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting testJobClassloader()"
+ " useCustomClasses=" + useCustomClasses);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@ -221,6 +238,19 @@ public class TestMRJobs {
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
if (useCustomClasses) {
// to test AM loading user classes such as output format class, we want
// to blacklist them from the system classes (they need to be prepended
// as the first match wins)
String systemClasses =
sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
// exclude the custom classes from system classes
systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
CustomSpeculator.class.getName() + "," +
systemClasses;
sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
systemClasses);
}
sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
@ -233,12 +263,66 @@ public class TestMRJobs {
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
if (useCustomClasses) {
// set custom output format class and speculator class
job.setOutputFormatClass(CustomOutputFormat.class);
final Configuration jobConf = job.getConfiguration();
jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
Speculator.class);
// speculation needs to be enabled for the speculator to be loaded
jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
}
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded);
}
public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> {
public CustomOutputFormat() {
verifyClassLoader(getClass());
}
/**
* Verifies that the class was loaded by the job classloader if it is in the
* context of the MRAppMaster, and if not throws an exception to fail the
* job.
*/
private void verifyClassLoader(Class<?> cls) {
// to detect that it is instantiated in the context of the MRAppMaster, we
// inspect the stack trace and determine a caller is MRAppMaster
for (StackTraceElement e: new Throwable().getStackTrace()) {
if (e.getClassName().equals(MRAppMaster.class.getName()) &&
!(cls.getClassLoader() instanceof ApplicationClassLoader)) {
throw new ExceptionInInitializerError("incorrect classloader used");
}
}
}
}
public static class CustomSpeculator extends DefaultSpeculator {
public CustomSpeculator(Configuration conf, AppContext context) {
super(conf, context);
verifyClassLoader(getClass());
}
/**
* Verifies that the class was loaded by the job classloader if it is in the
* context of the MRAppMaster, and if not throws an exception to fail the
* job.
*/
private void verifyClassLoader(Class<?> cls) {
// to detect that it is instantiated in the context of the MRAppMaster, we
// inspect the stack trace and determine a caller is MRAppMaster
for (StackTraceElement e: new Throwable().getStackTrace()) {
if (e.getClassName().equals(MRAppMaster.class.getName()) &&
!(cls.getClassLoader() instanceof ApplicationClassLoader)) {
throw new ExceptionInInitializerError("incorrect classloader used");
}
}
}
}
protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException {
Counters counters = job.getCounters();