YARN-6475. Fix some long function checkstyle issues

(Contributed by Soumabrata Chakraborty via Daniel Templeton)
This commit is contained in:
Daniel Templeton 2017-05-10 10:45:02 -07:00
parent 1e71fe8c42
commit 74a61438ca
4 changed files with 429 additions and 376 deletions

View File

@ -62,7 +62,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.*; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.*;
@ -442,24 +441,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
public int launchContainer(ContainerStartContext ctx) public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException { throws IOException, ConfigurationException {
Container container = ctx.getContainer(); Container container = ctx.getContainer();
Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String user = ctx.getUser(); String user = ctx.getUser();
String appId = ctx.getAppId();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
List<String> filecacheDirs = ctx.getFilecacheDirs();
List<String> userLocalDirs = ctx.getUserLocalDirs();
List<String> containerLocalDirs = ctx.getContainerLocalDirs();
List<String> containerLogDirs = ctx.getContainerLogDirs();
Map<Path, List<String>> localizedResources = ctx.getLocalizedResources();
verifyUsernamePattern(user); verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user);
ContainerId containerId = container.getContainerId(); ContainerId containerId = container.getContainerId();
String containerIdStr = containerId.toString();
resourcesHandler.preExecute(containerId, resourcesHandler.preExecute(containerId,
container.getResource()); container.getResource());
@ -514,39 +500,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
try { try {
Path pidFilePath = getPidFilePath(containerId); Path pidFilePath = getPidFilePath(containerId);
if (pidFilePath != null) { if (pidFilePath != null) {
List<String> prefixCommands = new ArrayList<>();
ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
.Builder(container);
addSchedPriorityCommand(prefixCommands); ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
if (prefixCommands.size() > 0) { ctx, pidFilePath, resourcesOptions, tcCommandFile);
builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
prefixCommands);
}
builder.setExecutionAttribute(LOCALIZED_RESOURCES, localizedResources) linuxContainerRuntime.launchContainer(runtimeContext);
.setExecutionAttribute(RUN_AS_USER, runAsUser)
.setExecutionAttribute(USER, user)
.setExecutionAttribute(APPID, appId)
.setExecutionAttribute(CONTAINER_ID_STR, containerIdStr)
.setExecutionAttribute(CONTAINER_WORK_DIR, containerWorkDir)
.setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
nmPrivateContainerScriptPath)
.setExecutionAttribute(NM_PRIVATE_TOKENS_PATH, nmPrivateTokensPath)
.setExecutionAttribute(PID_FILE_PATH, pidFilePath)
.setExecutionAttribute(LOCAL_DIRS, localDirs)
.setExecutionAttribute(LOG_DIRS, logDirs)
.setExecutionAttribute(FILECACHE_DIRS, filecacheDirs)
.setExecutionAttribute(USER_LOCAL_DIRS, userLocalDirs)
.setExecutionAttribute(CONTAINER_LOCAL_DIRS, containerLocalDirs)
.setExecutionAttribute(CONTAINER_LOG_DIRS, containerLogDirs)
.setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);
if (tcCommandFile != null) {
builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
}
linuxContainerRuntime.launchContainer(builder.build());
} else { } else {
LOG.info( LOG.info(
"Container was marked as inactive. Returning terminated error"); "Container was marked as inactive. Returning terminated error");
@ -617,6 +575,50 @@ public class LinuxContainerExecutor extends ContainerExecutor {
return 0; return 0;
} }
private ContainerRuntimeContext buildContainerRuntimeContext(
ContainerStartContext ctx, Path pidFilePath,
String resourcesOptions, String tcCommandFile) {
List<String> prefixCommands = new ArrayList<>();
addSchedPriorityCommand(prefixCommands);
Container container = ctx.getContainer();
ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
.Builder(container);
if (prefixCommands.size() > 0) {
builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
prefixCommands);
}
builder.setExecutionAttribute(LOCALIZED_RESOURCES,
ctx.getLocalizedResources())
.setExecutionAttribute(RUN_AS_USER, getRunAsUser(ctx.getUser()))
.setExecutionAttribute(USER, ctx.getUser())
.setExecutionAttribute(APPID, ctx.getAppId())
.setExecutionAttribute(CONTAINER_ID_STR,
container.getContainerId().toString())
.setExecutionAttribute(CONTAINER_WORK_DIR, ctx.getContainerWorkDir())
.setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
ctx.getNmPrivateContainerScriptPath())
.setExecutionAttribute(NM_PRIVATE_TOKENS_PATH,
ctx.getNmPrivateTokensPath())
.setExecutionAttribute(PID_FILE_PATH, pidFilePath)
.setExecutionAttribute(LOCAL_DIRS, ctx.getLocalDirs())
.setExecutionAttribute(LOG_DIRS, ctx.getLogDirs())
.setExecutionAttribute(FILECACHE_DIRS, ctx.getFilecacheDirs())
.setExecutionAttribute(USER_LOCAL_DIRS, ctx.getUserLocalDirs())
.setExecutionAttribute(CONTAINER_LOCAL_DIRS, ctx.getContainerLocalDirs())
.setExecutionAttribute(CONTAINER_LOG_DIRS, ctx.getContainerLogDirs())
.setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);
if (tcCommandFile != null) {
builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
}
return builder.build();
}
@Override @Override
public String[] getIpAndHost(Container container) { public String[] getIpAndHost(Container container) {
return linuxContainerRuntime.getIpAndHost(container); return linuxContainerRuntime.getIpAndHost(container);

View File

@ -761,200 +761,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void startStatusUpdater() { protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() { statusUpdaterRunnable = new StatusUpdaterRunnable();
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartbeatID = 0;
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
NodeStatusUpdaterImpl.this.context
.getRegisteredCollectors());
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
List<LogAggregationReport> logAggregationReports =
getLogAggregationReportsForApps(context
.getLogAggregationStatusForApps());
if (logAggregationReports != null
&& !logAggregationReports.isEmpty()) {
request.setLogAggregationReportsForApps(logAggregationReports);
}
}
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
if (!handleShutdownOrResyncCommand(response)) {
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
response);
// Explicitly put this method after checking the resync
// response. We
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
// when NM re-registers with RM.
// Only remove the cleanedup containers that are acked
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
logAggregationReportForAppsTempList.clear();
lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason
.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
Map<ApplicationId, ByteBuffer> systemCredentials =
response.getSystemCredentialsForApps();
if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context).setSystemCrendentialsForApps(
parseCredentials(systemCredentials));
}
List<org.apache.hadoop.yarn.api.records.Container>
containersToDecrease = response.getContainersToDecrease();
if (!containersToDecrease.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrDecreaseContainersResourceEvent(
containersToDecrease)
);
}
// SignalContainer request originally comes from end users via
// ClientRMProtocol's SignalContainer. Forward the request to
// ContainerManager which will dispatch the event to
// ContainerLauncher.
List<SignalContainerRequest> containersToSignal = response
.getContainersToSignalList();
if (containersToSignal.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrSignalContainersEvent(containersToSignal));
}
// Update QueuingLimits if ContainerManager supports queuing
ContainerQueuingLimit queuingLimit =
response.getContainerQueuingLimit();
if (queuingLimit != null) {
context.getContainerManager().updateQueuingLimit(queuingLimit);
}
}
// Handling node resource update case.
Resource newResource = response.getResource();
if (newResource != null) {
updateNMResource(newResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Node's resource is updated to " +
newResource.toString());
}
}
if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
updateTimelineClientsAddress(response);
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
// failed to connect to RM.
failedToConnect = true;
throw new YarnRuntimeException(e);
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
} finally {
synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
nextHeartBeatInterval;
try {
heartbeatMonitor.wait(nextHeartBeatInterval);
} catch (InterruptedException e) {
// Do Nothing
}
}
}
}
}
private void updateTimelineClientsAddress(
NodeHeartbeatResponse response) {
Map<ApplicationId, String> knownCollectorsMap =
response.getAppCollectorsMap();
if (knownCollectorsMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No collectors to update RM");
}
} else {
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
knownCollectorsMap.entrySet();
for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
ApplicationId appId = entry.getKey();
String collectorAddr = entry.getValue();
// Only handle applications running on local node.
// Not include apps with timeline collectors running in local
Application application = context.getApplications().get(appId);
// TODO this logic could be problematic if the collector address
// gets updated due to NM restart or collector service failure
if (application != null &&
!context.getRegisteredCollectors().containsKey(appId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
}
NMTimelinePublisher nmTimelinePublisher =
context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.setTimelineServiceAddress(
application.getAppId(), collectorAddr);
}
}
}
}
}
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
if (updatedMasterKey != null) {
// Will be non-null only on roll-over on RM side
context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
}
updatedMasterKey = response.getNMTokenMasterKey();
if (updatedMasterKey != null) {
context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
}
}
};
statusUpdater = statusUpdater =
new Thread(statusUpdaterRunnable, "Node Status Updater"); new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start(); statusUpdater.start();
@ -1215,4 +1022,199 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
} }
} }
private class StatusUpdaterRunnable implements Runnable {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartbeatID = 0;
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
NodeStatusUpdaterImpl.this.context
.getRegisteredCollectors());
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
List<LogAggregationReport> logAggregationReports =
getLogAggregationReportsForApps(context
.getLogAggregationStatusForApps());
if (logAggregationReports != null
&& !logAggregationReports.isEmpty()) {
request.setLogAggregationReportsForApps(logAggregationReports);
}
}
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
if (!handleShutdownOrResyncCommand(response)) {
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
response);
// Explicitly put this method after checking the resync
// response. We
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
// when NM re-registers with RM.
// Only remove the cleanedup containers that are acked
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
logAggregationReportForAppsTempList.clear();
lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason
.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
Map<ApplicationId, ByteBuffer> systemCredentials =
response.getSystemCredentialsForApps();
if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context).setSystemCrendentialsForApps(
parseCredentials(systemCredentials));
}
List<org.apache.hadoop.yarn.api.records.Container>
containersToDecrease = response.getContainersToDecrease();
if (!containersToDecrease.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrDecreaseContainersResourceEvent(
containersToDecrease)
);
}
// SignalContainer request originally comes from end users via
// ClientRMProtocol's SignalContainer. Forward the request to
// ContainerManager which will dispatch the event to
// ContainerLauncher.
List<SignalContainerRequest> containersToSignal = response
.getContainersToSignalList();
if (!containersToSignal.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrSignalContainersEvent(containersToSignal));
}
// Update QueuingLimits if ContainerManager supports queuing
ContainerQueuingLimit queuingLimit =
response.getContainerQueuingLimit();
if (queuingLimit != null) {
context.getContainerManager().updateQueuingLimit(queuingLimit);
}
}
// Handling node resource update case.
Resource newResource = response.getResource();
if (newResource != null) {
updateNMResource(newResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Node's resource is updated to " +
newResource.toString());
}
}
if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
updateTimelineClientsAddress(response);
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
// failed to connect to RM.
failedToConnect = true;
throw new YarnRuntimeException(e);
} catch (Exception e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
} finally {
synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
nextHeartBeatInterval;
try {
heartbeatMonitor.wait(nextHeartBeatInterval);
} catch (InterruptedException e) {
// Do Nothing
}
}
}
}
}
private void updateTimelineClientsAddress(
NodeHeartbeatResponse response) {
Map<ApplicationId, String> knownCollectorsMap =
response.getAppCollectorsMap();
if (knownCollectorsMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No collectors to update RM");
}
} else {
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
knownCollectorsMap.entrySet();
for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
ApplicationId appId = entry.getKey();
String collectorAddr = entry.getValue();
// Only handle applications running on local node.
// Not include apps with timeline collectors running in local
Application application = context.getApplications().get(appId);
// TODO this logic could be problematic if the collector address
// gets updated due to NM restart or collector service failure
if (application != null &&
!context.getRegisteredCollectors().containsKey(appId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
}
NMTimelinePublisher nmTimelinePublisher =
context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.setTimelineServiceAddress(
application.getAppId(), collectorAddr);
}
}
}
}
}
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
if (updatedMasterKey != null) {
// Will be non-null only on roll-over on RM side
context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
}
updatedMasterKey = response.getNMTokenMasterKey();
if (updatedMasterKey != null) {
context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
}
}
}
} }

View File

@ -213,12 +213,7 @@ public class ContainerLaunch implements Callable<Integer> {
DataOutputStream tokensOutStream = null; DataOutputStream tokensOutStream = null;
// Select the working directory for the container // Select the working directory for the container
Path containerWorkDir = Path containerWorkDir = deriveContainerWorkDir();
dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+ Path.SEPARATOR + user + Path.SEPARATOR
+ ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, false);
recordContainerWorkDir(containerID, containerWorkDir.toString()); recordContainerWorkDir(containerID, containerWorkDir.toString());
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
@ -259,12 +254,8 @@ public class ContainerLaunch implements Callable<Integer> {
sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs, sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
containerLogDirs, localResources, nmPrivateClasspathJarDir); containerLogDirs, localResources, nmPrivateClasspathJarDir);
exec.prepareContainer(new ContainerPrepareContext.Builder() prepareContainer(localResources, containerLocalDirs);
.setContainer(container)
.setLocalizedResources(localResources)
.setUser(user)
.setContainerLocalDirs(containerLocalDirs)
.setCommands(launchContext.getCommands()).build());
// Write out the environment // Write out the environment
exec.writeLaunchEnv(containerScriptOutStream, environment, exec.writeLaunchEnv(containerScriptOutStream, environment,
localResources, launchContext.getCommands(), localResources, launchContext.getCommands(),
@ -317,6 +308,39 @@ public class ContainerLaunch implements Callable<Integer> {
return ret; return ret;
} }
private Path deriveContainerWorkDir() throws IOException {
final String containerWorkDirPath =
ContainerLocalizer.USERCACHE +
Path.SEPARATOR +
container.getUser() +
Path.SEPARATOR +
ContainerLocalizer.APPCACHE +
Path.SEPARATOR +
app.getAppId().toString() +
Path.SEPARATOR +
container.getContainerId().toString();
final Path containerWorkDir =
dirsHandler.getLocalPathForWrite(
containerWorkDirPath,
LocalDirAllocator.SIZE_UNKNOWN, false);
return containerWorkDir;
}
private void prepareContainer(Map<Path, List<String>> localResources,
List<String> containerLocalDirs) throws IOException {
exec.prepareContainer(new ContainerPrepareContext.Builder()
.setContainer(container)
.setLocalizedResources(localResources)
.setUser(container.getUser())
.setContainerLocalDirs(containerLocalDirs)
.setCommands(container.getLaunchContext().getCommands())
.build());
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected boolean validateContainerState() { protected boolean validateContainerState() {
// CONTAINER_KILLED_ON_REQUEST should not be missed if the container // CONTAINER_KILLED_ON_REQUEST should not be missed if the container
@ -1116,98 +1140,9 @@ public class ContainerLaunch implements Callable<Integer> {
// TODO: Remove Windows check and use this approach on all platforms after // TODO: Remove Windows check and use this approach on all platforms after
// additional testing. See YARN-358. // additional testing. See YARN-358.
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
String inputClassPath = environment.get(Environment.CLASSPATH.name());
if (inputClassPath != null && !inputClassPath.isEmpty()) { sanitizeWindowsEnv(environment, pwd,
resources, nmPrivateClasspathJarDir);
//On non-windows, localized resources
//from distcache are available via the classpath as they were placed
//there but on windows they are not available when the classpath
//jar is created and so they "are lost" and have to be explicitly
//added to the classpath instead. This also means that their position
//is lost relative to other non-distcache classpath entries which will
//break things like mapreduce.job.user.classpath.first. An environment
//variable can be set to indicate that distcache entries should come
//first
boolean preferLocalizedJars = Boolean.parseBoolean(
environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
);
boolean needsSeparator = false;
StringBuilder newClassPath = new StringBuilder();
if (!preferLocalizedJars) {
newClassPath.append(inputClassPath);
needsSeparator = true;
}
// Localized resources do not exist at the desired paths yet, because the
// container launch script has not run to create symlinks yet. This
// means that FileUtil.createJarWithClassPath can't automatically expand
// wildcards to separate classpath entries for each file in the manifest.
// To resolve this, append classpath entries explicitly for each
// resource.
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
.isDirectory();
for (String linkName : entry.getValue()) {
// Append resource.
if (needsSeparator) {
newClassPath.append(File.pathSeparator);
} else {
needsSeparator = true;
}
newClassPath.append(pwd.toString())
.append(Path.SEPARATOR).append(linkName);
// FileUtil.createJarWithClassPath must use File.toURI to convert
// each file to a URI to write into the manifest's classpath. For
// directories, the classpath must have a trailing '/', but
// File.toURI only appends the trailing '/' if it is a directory that
// already exists. To resolve this, add the classpath entries with
// explicit trailing '/' here for any localized resource that targets
// a directory. Then, FileUtil.createJarWithClassPath will guarantee
// that the resulting entry in the manifest's classpath will have a
// trailing '/', and thus refer to a directory instead of a file.
if (targetIsDirectory) {
newClassPath.append(Path.SEPARATOR);
}
}
}
if (preferLocalizedJars) {
if (needsSeparator) {
newClassPath.append(File.pathSeparator);
}
newClassPath.append(inputClassPath);
}
// When the container launches, it takes the parent process's environment
// and then adds/overwrites with the entries from the container launch
// context. Do the same thing here for correct substitution of
// environment variables in the classpath jar manifest.
Map<String, String> mergedEnv = new HashMap<String, String>(
System.getenv());
mergedEnv.putAll(environment);
// this is hacky and temporary - it's to preserve the windows secure
// behavior but enable non-secure windows to properly build the class
// path for access to job.jar/lib/xyz and friends (see YARN-2803)
Path jarDir;
if (exec instanceof WindowsSecureContainerExecutor) {
jarDir = nmPrivateClasspathJarDir;
} else {
jarDir = pwd;
}
String[] jarCp = FileUtil.createJarWithClassPath(
newClassPath.toString(), jarDir, pwd, mergedEnv);
// In a secure cluster the classpath jar must be localized to grant access
Path localizedClassPathJar = exec.localizeClasspathJar(
new Path(jarCp[0]), pwd, container.getUser());
String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
environment.put(Environment.CLASSPATH.name(), replacementClassPath);
}
} }
// put AuxiliaryService data to environment // put AuxiliaryService data to environment
for (Map.Entry<String, ByteBuffer> meta : containerManager for (Map.Entry<String, ByteBuffer> meta : containerManager
@ -1217,6 +1152,103 @@ public class ContainerLaunch implements Callable<Integer> {
} }
} }
private void sanitizeWindowsEnv(Map<String, String> environment, Path pwd,
Map<Path, List<String>> resources, Path nmPrivateClasspathJarDir)
throws IOException {
String inputClassPath = environment.get(Environment.CLASSPATH.name());
if (inputClassPath != null && !inputClassPath.isEmpty()) {
//On non-windows, localized resources
//from distcache are available via the classpath as they were placed
//there but on windows they are not available when the classpath
//jar is created and so they "are lost" and have to be explicitly
//added to the classpath instead. This also means that their position
//is lost relative to other non-distcache classpath entries which will
//break things like mapreduce.job.user.classpath.first. An environment
//variable can be set to indicate that distcache entries should come
//first
boolean preferLocalizedJars = Boolean.parseBoolean(
environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
);
boolean needsSeparator = false;
StringBuilder newClassPath = new StringBuilder();
if (!preferLocalizedJars) {
newClassPath.append(inputClassPath);
needsSeparator = true;
}
// Localized resources do not exist at the desired paths yet, because the
// container launch script has not run to create symlinks yet. This
// means that FileUtil.createJarWithClassPath can't automatically expand
// wildcards to separate classpath entries for each file in the manifest.
// To resolve this, append classpath entries explicitly for each
// resource.
for (Map.Entry<Path, List<String>> entry : resources.entrySet()) {
boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
.isDirectory();
for (String linkName : entry.getValue()) {
// Append resource.
if (needsSeparator) {
newClassPath.append(File.pathSeparator);
} else {
needsSeparator = true;
}
newClassPath.append(pwd.toString())
.append(Path.SEPARATOR).append(linkName);
// FileUtil.createJarWithClassPath must use File.toURI to convert
// each file to a URI to write into the manifest's classpath. For
// directories, the classpath must have a trailing '/', but
// File.toURI only appends the trailing '/' if it is a directory that
// already exists. To resolve this, add the classpath entries with
// explicit trailing '/' here for any localized resource that targets
// a directory. Then, FileUtil.createJarWithClassPath will guarantee
// that the resulting entry in the manifest's classpath will have a
// trailing '/', and thus refer to a directory instead of a file.
if (targetIsDirectory) {
newClassPath.append(Path.SEPARATOR);
}
}
}
if (preferLocalizedJars) {
if (needsSeparator) {
newClassPath.append(File.pathSeparator);
}
newClassPath.append(inputClassPath);
}
// When the container launches, it takes the parent process's environment
// and then adds/overwrites with the entries from the container launch
// context. Do the same thing here for correct substitution of
// environment variables in the classpath jar manifest.
Map<String, String> mergedEnv = new HashMap<String, String>(
System.getenv());
mergedEnv.putAll(environment);
// this is hacky and temporary - it's to preserve the windows secure
// behavior but enable non-secure windows to properly build the class
// path for access to job.jar/lib/xyz and friends (see YARN-2803)
Path jarDir;
if (exec instanceof WindowsSecureContainerExecutor) {
jarDir = nmPrivateClasspathJarDir;
} else {
jarDir = pwd;
}
String[] jarCp = FileUtil.createJarWithClassPath(
newClassPath.toString(), jarDir, pwd, mergedEnv);
// In a secure cluster the classpath jar must be localized to grant access
Path localizedClassPathJar = exec.localizeClasspathJar(
new Path(jarCp[0]), pwd, container.getUser());
String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
environment.put(Environment.CLASSPATH.name(), replacementClassPath);
}
}
public static String getExitCodeFile(String pidFile) { public static String getExitCodeFile(String pidFile) {
return pidFile + EXIT_CODE_FILE_SUFFIX; return pidFile + EXIT_CODE_FILE_SUFFIX;
} }

View File

@ -424,10 +424,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
//List<String> -> stored as List -> fetched/converted to List<String> //List<String> -> stored as List -> fetched/converted to List<String>
//we can't do better here thanks to type-erasure //we can't do better here thanks to type-erasure
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<String> localDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
@SuppressWarnings("unchecked")
List<String> logDirs = ctx.getExecutionAttribute(LOG_DIRS);
@SuppressWarnings("unchecked")
List<String> filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS); List<String> filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<String> containerLocalDirs = ctx.getExecutionAttribute( List<String> containerLocalDirs = ctx.getExecutionAttribute(
@ -489,9 +485,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand); addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);
Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute(
NM_PRIVATE_CONTAINER_SCRIPT_PATH);
String disableOverride = environment.get( String disableOverride = environment.get(
ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE); ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE);
@ -511,33 +504,8 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
String commandFile = dockerClient.writeCommandToTempFile(runCommand, String commandFile = dockerClient.writeCommandToTempFile(runCommand,
containerIdStr); containerIdStr);
PrivilegedOperation launchOp = new PrivilegedOperation( PrivilegedOperation launchOp = buildLaunchOp(ctx,
PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER); commandFile, runCommand);
launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER),
Integer.toString(PrivilegedOperation
.RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
ctx.getExecutionAttribute(APPID),
containerIdStr, containerWorkDir.toString(),
nmPrivateContainerScriptPath.toUri().getPath(),
ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
localDirs),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
logDirs),
commandFile,
resourcesOpts);
String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
if (tcCommandFile != null) {
launchOp.appendArgs(tcCommandFile);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Launching container with cmd: " + runCommand
.getCommandWithArguments());
}
try { try {
privilegedOperationExecutor.executePrivilegedOperation(null, privilegedOperationExecutor.executePrivilegedOperation(null,
@ -635,4 +603,53 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
} }
return null; return null;
} }
private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
String commandFile, DockerRunCommand runCommand) {
String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
String containerIdStr = ctx.getContainer().getContainerId().toString();
Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute(
NM_PRIVATE_CONTAINER_SCRIPT_PATH);
Path containerWorkDir = ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
//we can't do better here thanks to type-erasure
@SuppressWarnings("unchecked")
List<String> localDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
@SuppressWarnings("unchecked")
List<String> logDirs = ctx.getExecutionAttribute(LOG_DIRS);
String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);
PrivilegedOperation launchOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER),
Integer.toString(PrivilegedOperation
.RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
ctx.getExecutionAttribute(APPID),
containerIdStr,
containerWorkDir.toString(),
nmPrivateContainerScriptPath.toUri().getPath(),
ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
localDirs),
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
logDirs),
commandFile,
resourcesOpts);
String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
if (tcCommandFile != null) {
launchOp.appendArgs(tcCommandFile);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Launching container with cmd: " + runCommand
.getCommandWithArguments());
}
return launchOp;
}
} }