MAPREDUCE-6293. Set job classloader on uber-job's LocalContainerLauncher event thread. (Sangjin Lee via gera)

This commit is contained in:
Gera Shegalov 2015-04-21 11:46:35 -07:00
parent 105afd5477
commit 725eb52ddc
5 changed files with 34 additions and 2 deletions

View File

@ -337,6 +337,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options
which is a regression from MR1 (zxu via rkanter)
MAPREDUCE-6293. Set job classloader on uber-job's LocalContainerLauncher
event thread. (Sangjin Lee via gera)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -80,6 +80,7 @@ public class LocalContainerLauncher extends AbstractService implements
private final HashSet<File> localizedFiles;
private final AppContext context;
private final TaskUmbilicalProtocol umbilical;
private final ClassLoader jobClassLoader;
private ExecutorService taskRunner;
private Thread eventHandler;
private BlockingQueue<ContainerLauncherEvent> eventQueue =
@ -87,6 +88,12 @@ public class LocalContainerLauncher extends AbstractService implements
public LocalContainerLauncher(AppContext context,
TaskUmbilicalProtocol umbilical) {
this(context, umbilical, null);
}
public LocalContainerLauncher(AppContext context,
TaskUmbilicalProtocol umbilical,
ClassLoader jobClassLoader) {
super(LocalContainerLauncher.class.getName());
this.context = context;
this.umbilical = umbilical;
@ -94,6 +101,7 @@ public LocalContainerLauncher(AppContext context,
// (TODO/FIXME: pointless to use RPC to talk to self; should create
// LocalTaskAttemptListener or similar: implement umbilical protocol
// but skip RPC stuff)
this.jobClassLoader = jobClassLoader;
try {
curFC = FileContext.getFileContext(curDir.toURI());
@ -133,6 +141,18 @@ public void serviceStart() throws Exception {
setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
// create and start an event handling thread
eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
// if the job classloader is specified, set it onto the event handler as the
// thread context classloader so that it can be used by the event handler
// as well as the subtask runner threads
if (jobClassLoader != null) {
LOG.info("Setting " + jobClassLoader +
" as the context classloader of thread " + eventHandler.getName());
eventHandler.setContextClassLoader(jobClassLoader);
} else {
// note the current TCCL
LOG.info("Context classloader of thread " + eventHandler.getName() +
": " + eventHandler.getContextClassLoader());
}
eventHandler.start();
super.serviceStart();
}

View File

@ -889,7 +889,7 @@ private final class ContainerLauncherRouter extends AbstractService
protected void serviceStart() throws Exception {
if (job.isUber()) {
this.containerLauncher = new LocalContainerLauncher(context,
(TaskUmbilicalProtocol) taskAttemptListener);
(TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
} else {
this.containerLauncher = new ContainerLauncherImpl(context);
}

View File

@ -378,7 +378,7 @@ public static ClassLoader createJobClassLoader(Configuration conf)
public static void setClassLoader(ClassLoader classLoader,
Configuration conf) {
if (classLoader != null) {
LOG.info("Setting classloader " + classLoader.getClass().getName() +
LOG.info("Setting classloader " + classLoader +
" on the configuration and as the thread context classloader");
conf.setClassLoader(classLoader);
Thread.currentThread().setContextClassLoader(classLoader);

View File

@ -997,6 +997,15 @@ protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
final Configuration conf = context.getConfiguration();
// check if the job classloader is enabled and verify the TCCL
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
if (!(tccl instanceof ApplicationClassLoader)) {
throw new IOException("TCCL expected: " +
ApplicationClassLoader.class.getName() + ", actual: " +
tccl.getClass().getName());
}
}
final String ioSortMb = conf.get(MRJobConfig.IO_SORT_MB);
if (!TEST_IO_SORT_MB.equals(ioSortMb)) {
throw new IOException("io.sort.mb expected: " + TEST_IO_SORT_MB