YARN-3626. On Windows localized resources are not moved to the front of the classpath when they should be. Contributed by Craig Welch
This commit is contained in:
parent
341a476812
commit
0f95921447
|
@ -241,7 +241,12 @@ public class MRApps extends Apps {
|
|||
boolean userClassesTakesPrecedence =
|
||||
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
|
||||
|
||||
String classpathEnvVar =
|
||||
if (userClassesTakesPrecedence) {
|
||||
conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE,
|
||||
"true");
|
||||
}
|
||||
|
||||
String classpathEnvVar =
|
||||
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
|
||||
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
|
||||
|
||||
|
|
|
@ -480,6 +480,9 @@ Release 2.7.1 - UNRELEASED
|
|||
YARN-3493. RM fails to come up with error "Failed to load/recover state"
|
||||
when mem settings are changed. (Jian He via wangda)
|
||||
|
||||
YARN-3626. On Windows localized resources are not moved to the front
|
||||
of the classpath when they should be. (Craig Welch via xgong)
|
||||
|
||||
Release 2.7.0 - 2015-04-20
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1248,6 +1248,16 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
|
||||
+ "application.classpath";
|
||||
|
||||
/**
|
||||
* Whether or not entries from the distributed cache should be preferred over
|
||||
* the rest of the YARN CLASSPATH
|
||||
*/
|
||||
public static final String YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE =
|
||||
YARN_PREFIX + "application.classpath.prepend.distcache";
|
||||
|
||||
public static final boolean
|
||||
DEFAULT_YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE = false;
|
||||
|
||||
/**
|
||||
* Default platform-agnostic CLASSPATH for YARN applications. A
|
||||
* comma-separated list of CLASSPATH entries. The parameter expansion marker
|
||||
|
|
|
@ -726,8 +726,28 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
if (Shell.WINDOWS) {
|
||||
|
||||
String inputClassPath = environment.get(Environment.CLASSPATH.name());
|
||||
|
||||
if (inputClassPath != null && !inputClassPath.isEmpty()) {
|
||||
StringBuilder newClassPath = new StringBuilder(inputClassPath);
|
||||
|
||||
//On non-windows, localized resources
|
||||
//from distcache are available via the classpath as they were placed
|
||||
//there but on windows they are not available when the classpath
|
||||
//jar is created and so they "are lost" and have to be explicitly
|
||||
//added to the classpath instead. This also means that their position
|
||||
//is lost relative to other non-distcache classpath entries which will
|
||||
//break things like mapreduce.job.user.classpath.first.
|
||||
|
||||
boolean preferLocalizedJars = conf.getBoolean(
|
||||
YarnConfiguration.YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE,
|
||||
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE
|
||||
);
|
||||
|
||||
boolean needsSeparator = false;
|
||||
StringBuilder newClassPath = new StringBuilder();
|
||||
if (!preferLocalizedJars) {
|
||||
newClassPath.append(inputClassPath);
|
||||
needsSeparator = true;
|
||||
}
|
||||
|
||||
// Localized resources do not exist at the desired paths yet, because the
|
||||
// container launch script has not run to create symlinks yet. This
|
||||
|
@ -741,7 +761,12 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
|
||||
for (String linkName : entry.getValue()) {
|
||||
// Append resource.
|
||||
newClassPath.append(File.pathSeparator).append(pwd.toString())
|
||||
if (needsSeparator) {
|
||||
newClassPath.append(File.pathSeparator);
|
||||
} else {
|
||||
needsSeparator = true;
|
||||
}
|
||||
newClassPath.append(pwd.toString())
|
||||
.append(Path.SEPARATOR).append(linkName);
|
||||
|
||||
// FileUtil.createJarWithClassPath must use File.toURI to convert
|
||||
|
@ -758,6 +783,12 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (preferLocalizedJars) {
|
||||
if (needsSeparator) {
|
||||
newClassPath.append(File.pathSeparator);
|
||||
}
|
||||
newClassPath.append(inputClassPath);
|
||||
}
|
||||
|
||||
// When the container launches, it takes the parent process's environment
|
||||
// and then adds/overwrites with the entries from the container launch
|
||||
|
|
|
@ -39,6 +39,9 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.jar.JarFile;
|
||||
import java.util.jar.Manifest;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -65,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
|
@ -78,12 +82,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.ShellScriptBuilder;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
|
||||
|
@ -97,6 +107,17 @@ import org.junit.Test;
|
|||
|
||||
public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||
|
||||
protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
};
|
||||
public NodeId getNodeId() {
|
||||
return NodeId.newInstance("ahost", 1234);
|
||||
};
|
||||
};
|
||||
|
||||
public TestContainerLaunch() throws UnsupportedFileSystemException {
|
||||
super();
|
||||
}
|
||||
|
@ -371,6 +392,114 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrependDistcache() throws Exception {
|
||||
|
||||
// Test is only relevant on Windows
|
||||
Assume.assumeTrue(Shell.WINDOWS);
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
|
||||
ContainerId cId = ContainerId.newContainerId(appAttemptId, 0);
|
||||
Map<String, String> userSetEnv = new HashMap<String, String>();
|
||||
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
||||
userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
|
||||
userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
|
||||
userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
|
||||
userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
|
||||
userSetEnv.put(Environment.USER.key(), "user_set_" +
|
||||
Environment.USER.key());
|
||||
userSetEnv.put(Environment.LOGNAME.name(), "user_set_LOGNAME");
|
||||
userSetEnv.put(Environment.PWD.name(), "user_set_PWD");
|
||||
userSetEnv.put(Environment.HOME.name(), "user_set_HOME");
|
||||
userSetEnv.put(Environment.CLASSPATH.name(), "SYSTEM_CLPATH");
|
||||
containerLaunchContext.setEnvironment(userSetEnv);
|
||||
Container container = mock(Container.class);
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(containerLaunchContext);
|
||||
when(container.getLocalizedResources()).thenReturn(null);
|
||||
Dispatcher dispatcher = mock(Dispatcher.class);
|
||||
EventHandler eventHandler = new EventHandler() {
|
||||
public void handle(Event event) {
|
||||
Assert.assertTrue(event instanceof ContainerExitEvent);
|
||||
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
|
||||
Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||
exitEvent.getType());
|
||||
}
|
||||
};
|
||||
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
ContainerLaunch launch = new ContainerLaunch(distContext, conf,
|
||||
dispatcher, exec, null, container, dirsHandler, containerManager);
|
||||
|
||||
String testDir = System.getProperty("test.build.data",
|
||||
"target/test-dir");
|
||||
Path pwd = new Path(testDir);
|
||||
List<Path> appDirs = new ArrayList<Path>();
|
||||
List<String> containerLogs = new ArrayList<String>();
|
||||
|
||||
Map<Path, List<String>> resources = new HashMap<Path, List<String>>();
|
||||
Path userjar = new Path("user.jar");
|
||||
List<String> lpaths = new ArrayList<String>();
|
||||
lpaths.add("userjarlink.jar");
|
||||
resources.put(userjar, lpaths);
|
||||
|
||||
Path nmp = new Path(testDir);
|
||||
|
||||
launch.sanitizeEnv(
|
||||
userSetEnv, pwd, appDirs, containerLogs, resources, nmp);
|
||||
|
||||
List<String> result =
|
||||
getJarManifestClasspath(userSetEnv.get(Environment.CLASSPATH.name()));
|
||||
|
||||
Assert.assertTrue(result.size() > 1);
|
||||
Assert.assertTrue(
|
||||
result.get(result.size() - 1).endsWith("userjarlink.jar"));
|
||||
|
||||
//Now move userjar to the front
|
||||
|
||||
cId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
|
||||
conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE,
|
||||
"true");
|
||||
|
||||
launch = new ContainerLaunch(distContext, conf,
|
||||
dispatcher, exec, null, container, dirsHandler, containerManager);
|
||||
|
||||
launch.sanitizeEnv(
|
||||
userSetEnv, pwd, appDirs, containerLogs, resources, nmp);
|
||||
|
||||
result =
|
||||
getJarManifestClasspath(userSetEnv.get(Environment.CLASSPATH.name()));
|
||||
|
||||
Assert.assertTrue(result.size() > 1);
|
||||
Assert.assertTrue(
|
||||
result.get(0).endsWith("userjarlink.jar"));
|
||||
|
||||
}
|
||||
|
||||
private static List<String> getJarManifestClasspath(String path)
|
||||
throws Exception {
|
||||
List<String> classpath = new ArrayList<String>();
|
||||
JarFile jarFile = new JarFile(path);
|
||||
Manifest manifest = jarFile.getManifest();
|
||||
String cps = manifest.getMainAttributes().getValue("Class-Path");
|
||||
StringTokenizer cptok = new StringTokenizer(cps);
|
||||
while (cptok.hasMoreTokens()) {
|
||||
String cpentry = cptok.nextToken();
|
||||
classpath.add(cpentry);
|
||||
}
|
||||
return classpath;
|
||||
}
|
||||
|
||||
/**
|
||||
* See if environment variable is forwarded using sanitizeEnv.
|
||||
* @throws Exception
|
||||
|
|
Loading…
Reference in New Issue