MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1227422 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-01-05 01:29:52 +00:00
parent e793ba8cba
commit 0870734787
8 changed files with 250 additions and 81 deletions

View File

@ -403,6 +403,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3595. Add missing TestCounters#testCounterValue test from branch
1 to 0.23 (Tom White via sseth)
MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks.
(vinodkv via acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.util.Apps;
@SuppressWarnings("deprecation")
public class MapReduceChildJVM {
private static String getTaskLogFile(LogName filter) {
@ -46,7 +47,7 @@ public class MapReduceChildJVM {
jobConf.get(JobConf.MAPRED_TASK_ENV));
}
return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
jobConf.get(jobConf.MAPRED_TASK_ENV));
jobConf.get(JobConf.MAPRED_TASK_ENV));
}
private static String getChildLogLevel(JobConf conf, boolean isMap) {
@ -68,29 +69,9 @@ public class MapReduceChildJVM {
JobConf conf = task.conf;
// Shell
environment.put(
Environment.SHELL.name(),
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_SHELL,
MRJobConfig.DEFAULT_SHELL)
);
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
Apps.addToEnvironment(
environment,
Environment.LD_LIBRARY_PATH.name(),
Environment.PWD.$());
// Add the env variables passed by the user & admin
// Add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
Apps.setEnvFromInputString(environment, mapredChildEnv);
Apps.setEnvFromInputString(
environment,
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
);
// Set logging level in the environment.
// This is so that, if the child forks another "bin/hadoop" (common in

View File

@ -27,6 +27,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -154,6 +156,8 @@ public abstract class TaskAttemptImpl implements
private Token<JobTokenIdentifier> jobToken;
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
private static String initialClasspath = null;
private static Object commonContainerSpecLock = new Object();
private static ContainerLaunchContext commonContainerSpec = null;
private static final Object classpathLock = new Object();
private long launchTime;
private long finishTime;
@ -497,29 +501,27 @@ public abstract class TaskAttemptImpl implements
/**
* Create a {@link LocalResource} record with all the given parameters.
* TODO: This should pave way for Builder pattern.
*/
private static LocalResource createLocalResource(FileSystem fc,
RecordFactory recordFactory, Path file, LocalResourceType type,
LocalResourceVisibility visibility) throws IOException {
private static LocalResource createLocalResource(FileSystem fc, Path file,
LocalResourceType type, LocalResourceVisibility visibility)
throws IOException {
FileStatus fstat = fc.getFileStatus(file);
LocalResource resource =
recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
.getPath())));
resource.setType(type);
resource.setVisibility(visibility);
resource.setSize(fstat.getLen());
resource.setTimestamp(fstat.getModificationTime());
return resource;
URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
.getPath()));
long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime();
return BuilderUtils.newLocalResource(resourceURL, type, visibility,
resourceSize, resourceModificationTime);
}
/**
* Lock this on initialClasspath so that there is only one fork in the AM for
* getting the initial class-path. TODO: This should go away once we construct
* a parent CLC and use it for all the containers.
* getting the initial class-path. TODO: We already construct
* a parent CLC and use it for all the containers, so this should go away
* once the mr-generated-classpath stuff is gone.
*/
private String getInitialClasspath() throws IOException {
private static String getInitialClasspath() throws IOException {
synchronized (classpathLock) {
if (initialClasspathFlag.get()) {
return initialClasspath;
@ -534,11 +536,14 @@ public abstract class TaskAttemptImpl implements
/**
* Create the {@link ContainerLaunchContext} for this attempt.
* Create the common {@link ContainerLaunchContext} for all attempts.
* @param applicationACLs
*/
private ContainerLaunchContext createContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs) {
private static ContainerLaunchContext createCommonContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
Token<JobTokenIdentifier> jobToken,
final org.apache.hadoop.mapred.JobID oldJobId,
Collection<Token<? extends TokenIdentifier>> fsTokens) {
// Application resources
Map<String, LocalResource> localResources =
@ -556,13 +561,13 @@ public abstract class TaskAttemptImpl implements
FileSystem remoteFS = FileSystem.get(conf);
// //////////// Set up JobJar to be localized properly on the remote NM.
if (conf.get(MRJobConfig.JAR) != null) {
Path remoteJobJar = (new Path(remoteTask.getConf().get(
MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
String jobJar = conf.get(MRJobConfig.JAR);
if (jobJar != null) {
Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
.getUri(), remoteFS.getWorkingDirectory());
localResources.put(
MRJobConfig.JOB_JAR,
createLocalResource(remoteFS, recordFactory, remoteJobJar,
createLocalResource(remoteFS, remoteJobJar,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-jar file on the remote FS is "
+ remoteJobJar.toUri().toASCIIString());
@ -584,7 +589,7 @@ public abstract class TaskAttemptImpl implements
new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
localResources.put(
MRJobConfig.JOB_CONF_FILE,
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
createLocalResource(remoteFS, remoteJobConfPath,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-conf file on the remote FS is "
+ remoteJobConfPath.toUri().toASCIIString());
@ -630,19 +635,81 @@ public abstract class TaskAttemptImpl implements
throw new YarnException(e);
}
// Setup environment
MapReduceChildJVM.setVMEnv(environment, remoteTask);
// Shell
environment.put(
Environment.SHELL.name(),
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_SHELL,
MRJobConfig.DEFAULT_SHELL)
);
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
Apps.addToEnvironment(
environment,
Environment.LD_LIBRARY_PATH.name(),
Environment.PWD.$());
// Add the env variables passed by the admin
Apps.setEnvFromInputString(
environment,
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
);
// Construct the actual Container
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container = BuilderUtils
.newContainerLaunchContext(null, conf
.get(MRJobConfig.USER_NAME), null, localResources,
environment, null, serviceData, tokens, applicationACLs);
return container;
}
static ContainerLaunchContext createContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
ContainerId containerID, Configuration conf,
Token<JobTokenIdentifier> jobToken, Task remoteTask,
final org.apache.hadoop.mapred.JobID oldJobId,
Resource assignedCapability, WrappedJvmID jvmID,
TaskAttemptListener taskAttemptListener,
Collection<Token<? extends TokenIdentifier>> fsTokens) {
synchronized (commonContainerSpecLock) {
if (commonContainerSpec == null) {
commonContainerSpec = createCommonContainerLaunchContext(
applicationACLs, conf, jobToken, oldJobId, fsTokens);
}
}
// Fill in the fields needed per-container that are missing in the common
// spec.
// Setup environment by cloning from common env.
Map<String, String> env = commonContainerSpec.getEnvironment();
Map<String, String> myEnv = new HashMap<String, String>(env.size());
myEnv.putAll(env);
MapReduceChildJVM.setVMEnv(myEnv, remoteTask);
// Set up the launch command
List<String> commands = MapReduceChildJVM.getVMCommand(
taskAttemptListener.getAddress(), remoteTask,
jvmID);
taskAttemptListener.getAddress(), remoteTask, jvmID);
// Duplicate the ByteBuffers for access by multiple containers.
Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
for (Entry<String, ByteBuffer> entry : commonContainerSpec
.getServiceData().entrySet()) {
myServiceData.put(entry.getKey(), entry.getValue().duplicate());
}
// Construct the actual Container
ContainerLaunchContext container = BuilderUtils
.newContainerLaunchContext(containerID, conf
.get(MRJobConfig.USER_NAME), assignedCapability, localResources,
environment, commands, serviceData, tokens, applicationACLs);
ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
containerID, commonContainerSpec.getUser(), assignedCapability,
commonContainerSpec.getLocalResources(), myEnv, commands,
myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
applicationACLs);
return container;
}
@ -1022,7 +1089,7 @@ public abstract class TaskAttemptImpl implements
private static class ContainerAssignedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings({ "unchecked", "deprecation" })
@SuppressWarnings({ "unchecked" })
@Override
public void transition(final TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
@ -1046,20 +1113,16 @@ public abstract class TaskAttemptImpl implements
//launch the container
//create the container object to be launched for a given Task attempt
taskAttempt.eventHandler.handle(
new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
taskAttempt.containerID,
taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
@Override
public ContainerLaunchContext getContainer() {
return taskAttempt.createContainerLaunchContext(cEvent
.getApplicationACLs());
}
@Override
public Task getRemoteTask() { // classic mapred Task, not YARN version
return taskAttempt.remoteTask;
}
});
ContainerLaunchContext launchContext = createContainerLaunchContext(
cEvent.getApplicationACLs(), taskAttempt.containerID,
taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
taskAttempt.oldJobId, taskAttempt.assignedCapability,
taskAttempt.jvmID, taskAttempt.taskAttemptListener,
taskAttempt.fsTokens);
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
taskAttempt.attemptId, taskAttempt.containerID,
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
launchContext, taskAttempt.remoteTask));
// send event to speculator that our container needs are satisfied
taskAttempt.eventHandler.handle
@ -1197,7 +1260,6 @@ public abstract class TaskAttemptImpl implements
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
@SuppressWarnings("deprecation")
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(taskAttempt.conf,
TypeConverter.fromYarn(taskAttempt.attemptId));

View File

@ -24,17 +24,31 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
private final ContainerLaunchContext container;
private final Task task;
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
ContainerId containerID, String containerMgrAddress,
ContainerToken containerToken) {
super(taskAttemptID, containerID, containerMgrAddress,
containerToken,
ContainerToken containerToken,
ContainerLaunchContext containerLaunchContext, Task remoteTask) {
super(taskAttemptID, containerID, containerMgrAddress, containerToken,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
this.container = containerLaunchContext;
this.task = remoteTask;
}
public abstract ContainerLaunchContext getContainer();
public abstract Task getRemoteTask();
public ContainerLaunchContext getContainer() {
return this.container;
}
public Task getRemoteTask() {
return this.task;
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
@ -65,7 +66,9 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
@ -173,7 +176,8 @@ public class MRApp extends MRAppMaster {
}
public Job submit(Configuration conf) throws Exception {
String user = conf.get(MRJobConfig.USER_NAME, "mapred");
String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
.getCurrentUser().getShortUserName());
conf.set(MRJobConfig.USER_NAME, user);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
@ -187,6 +191,14 @@ public class MRApp extends MRAppMaster {
start();
DefaultMetricsSystem.shutdown();
Job job = getContext().getAllJobs().values().iterator().next();
// Write job.xml
String jobFile = MRApps.getJobFile(conf, user,
TypeConverter.fromYarn(job.getID()));
LOG.info("Writing job conf to " + jobFile);
new File(jobFile).getParentFile().mkdirs();
conf.writeXml(new FileOutputStream(jobFile));
return job;
}
@ -308,7 +320,7 @@ public class MRApp extends MRAppMaster {
return new TaskAttemptListener(){
@Override
public InetSocketAddress getAddress() {
return null;
return NetUtils.createSocketAddr("localhost:54321");
}
@Override
public void registerLaunchedTask(TaskAttemptId attemptID,
@ -337,11 +349,14 @@ public class MRApp extends MRAppMaster {
return new MockContainerLauncher();
}
class MockContainerLauncher implements ContainerLauncher {
protected class MockContainerLauncher implements ContainerLauncher {
//We are running locally so set the shuffle port to -1
int shufflePort = -1;
public MockContainerLauncher() {
}
@SuppressWarnings("unchecked")
@Override
public void handle(ContainerLauncherEvent event) {
@ -474,6 +489,7 @@ public class MRApp extends MRAppMaster {
}
@Override
protected void setup(JobImpl job) throws IOException {
super.setup(job);
job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
job.remoteJobConfFile = new Path("test");
}

View File

@ -40,6 +40,7 @@ import org.junit.Test;
/**
* Tests the state machine of MR App.
*/
@SuppressWarnings("unchecked")
public class TestMRApp {
@Test

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.junit.Test;
public class TestMapReduceChildJVM {
private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
@Test
public void testCommandLine() throws Exception {
MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
Job job = app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Assert.assertEquals(
"[exec $JAVA_HOME/bin/java" +
" -Djava.net.preferIPv4Stack=true" +
" -Dhadoop.metrics.log.level=WARN" +
" -Xmx200m -Djava.io.tmpdir=$PWD/tmp" +
" -Dlog4j.configuration=container-log4j.properties" +
" -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR>" +
" -Dyarn.app.mapreduce.container.log.filesize=0" +
" -Dhadoop.root.logger=INFO,CLA" +
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
" 54321" +
" attempt_0_0000_m_000000_0" +
" 0" +
" 1><LOG_DIR>/stdout" +
" 2><LOG_DIR>/stderr ]", app.myCommandLine);
}
private static final class MyMRApp extends MRApp {
private String myCommandLine;
public MyMRApp(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
return new MockContainerLauncher() {
@Override
public void handle(ContainerLauncherEvent event) {
if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
ContainerLaunchContext launchContext = launchEvent.getContainer();
String cmdString = launchContext.getCommands().toString();
LOG.info("launchContext " + cmdString);
myCommandLine = cmdString;
}
super.handle(event);
}
};
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@SuppressWarnings("unchecked")
public class TestTaskAttempt{
@Test