YARN-3684. Changed ContainerExecutor's primary lifecycle methods to use a more extensible mechanism of context objects. Contributed by Sidharta Seethana.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-05-21 15:50:23 -07:00
parent 4fc942a84f
commit 53fafcf061
26 changed files with 1163 additions and 242 deletions

View File

@ -253,6 +253,9 @@ Release 2.8.0 - UNRELEASED
YARN-3583. Support of NodeLabel object instead of plain String YARN-3583. Support of NodeLabel object instead of plain String
in YarnClient side. (Sunil G via wangda) in YarnClient side. (Sunil G via wangda)
YARN-3684. Changed ContainerExecutor's primary lifecycle methods to use a more
extensible mechanism of context objects. (Sidharta Seethana via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -46,6 +45,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -111,61 +116,67 @@ public abstract class ContainerExecutor implements Configurable {
* For $rsrc in job resources * For $rsrc in job resources
* Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef * Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef
* </pre> * </pre>
* @param user user name of application owner * @param ctx LocalizerStartContext that encapsulates necessary information
* @param appId id of the application * for starting a localizer.
* @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
* @param nmAddr RPC address to contact NM
* @param dirsHandler NM local dirs service, for nm-local-dirs and nm-log-dirs
* @throws IOException For most application init failures * @throws IOException For most application init failures
* @throws InterruptedException If application init thread is halted by NM * @throws InterruptedException If application init thread is halted by NM
*/ */
public abstract void startLocalizer(Path nmPrivateContainerTokens, public abstract void startLocalizer(LocalizerStartContext ctx)
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
throws IOException, InterruptedException; throws IOException, InterruptedException;
/** /**
* Launch the container on the node. This is a blocking call and returns only * Launch the container on the node. This is a blocking call and returns only
* when the container exits. * when the container exits.
* @param container the container to be launched * @param ctx Encapsulates information necessary for launching containers.
* @param nmPrivateContainerScriptPath the path for launch script
* @param nmPrivateTokensPath the path for tokens for the container
* @param user the user of the container
* @param appId the appId of the container
* @param containerWorkDir the work dir for the container
* @param localDirs nm-local-dirs to be used for this container
* @param logDirs nm-log-dirs to be used for this container
* @return the return status of the launch * @return the return status of the launch
* @throws IOException * @throws IOException
*/ */
public abstract int launchContainer(Container container, public abstract int launchContainer(ContainerStartContext ctx) throws
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, IOException;
String user, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException;
public abstract boolean signalContainer(String user, String pid, /**
Signal signal) * Signal container with the specified signal.
* @param ctx Encapsulates information necessary for signaling containers.
* @return returns true if the operation succeeded
* @throws IOException
*/
public abstract boolean signalContainer(ContainerSignalContext ctx)
throws IOException; throws IOException;
public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) /**
* Delete specified directories as a given user.
* @param ctx Encapsulates information necessary for deletion.
* @throws IOException
* @throws InterruptedException
*/
public abstract void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException; throws IOException, InterruptedException;
public abstract boolean isContainerProcessAlive(String user, String pid) /**
* Check if a container is alive.
* @param ctx Encapsulates information necessary for container liveness check.
* @return true if container is still alive
* @throws IOException
*/
public abstract boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException; throws IOException;
/** /**
* Recover an already existing container. This is a blocking call and returns * Recover an already existing container. This is a blocking call and returns
* only when the container exits. Note that the container must have been * only when the container exits. Note that the container must have been
* activated prior to this call. * activated prior to this call.
* @param user the user of the container * @param ctx encapsulates information necessary to reacquire container
* @param containerId The ID of the container to reacquire
* @return The exit code of the pre-existing container * @return The exit code of the pre-existing container
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
public int reacquireContainer(String user, ContainerId containerId) public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException {
String user = ctx.getUser();
ContainerId containerId = ctx.getContainerId();
Path pidPath = getPidFilePath(containerId); Path pidPath = getPidFilePath(containerId);
if (pidPath == null) { if (pidPath == null) {
LOG.warn(containerId + " is not active, returning terminated error"); LOG.warn(containerId + " is not active, returning terminated error");
@ -179,7 +190,12 @@ public abstract class ContainerExecutor implements Configurable {
} }
LOG.info("Reacquiring " + containerId + " with pid " + pid); LOG.info("Reacquiring " + containerId + " with pid " + pid);
while(isContainerProcessAlive(user, pid)) { ContainerLivenessContext livenessContext = new ContainerLivenessContext
.Builder()
.setUser(user)
.setPid(pid)
.build();
while(isContainerProcessAlive(livenessContext)) {
Thread.sleep(1000); Thread.sleep(1000);
} }
@ -486,7 +502,11 @@ public abstract class ContainerExecutor implements Configurable {
public void run() { public void run() {
try { try {
Thread.sleep(delay); Thread.sleep(delay);
containerExecutor.signalContainer(user, pid, signal); containerExecutor.signalContainer(new ContainerSignalContext.Builder()
.setUser(user)
.setPid(pid)
.setSignal(signal)
.build());
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} catch (IOException e) { } catch (IOException e) {

View File

@ -55,6 +55,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -94,10 +99,14 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public void startLocalizer(Path nmPrivateContainerTokensPath, public void startLocalizer(LocalizerStartContext ctx)
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs(); List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs(); List<String> logDirs = dirsHandler.getLogDirs();
@ -130,11 +139,15 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public int launchContainer(Container container, public int launchContainer(ContainerStartContext ctx) throws IOException {
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, Container container = ctx.getContainer();
String user, String appId, Path containerWorkDir, Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
List<String> localDirs, List<String> logDirs) throws IOException { Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String user = ctx.getUser();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
FsPermission dirPerm = new FsPermission(APPDIR_PERM); FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerId(); ContainerId containerId = container.getContainerId();
@ -394,8 +407,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public boolean signalContainer(String user, String pid, Signal signal) public boolean signalContainer(ContainerSignalContext ctx)
throws IOException { throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
Signal signal = ctx.getSignal();
LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
+ " as user " + user); + " as user " + user);
if (!containerIsAlive(pid)) { if (!containerIsAlive(pid)) {
@ -413,8 +430,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public boolean isContainerProcessAlive(String user, String pid) public boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException { throws IOException {
String pid = ctx.getPid();
return containerIsAlive(pid); return containerIsAlive(pid);
} }
@ -451,9 +470,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public void deleteAsUser(String user, Path subDir, Path... baseDirs) public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) { Path subDir = ctx.getSubDir();
List<Path> baseDirs = ctx.getBasedirs();
if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + subDir); LOG.info("Deleting absolute path : " + subDir);
if (!lfs.delete(subDir, true)) { if (!lfs.delete(subDir, true)) {
//Maybe retry //Maybe retry

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
@ -290,10 +291,16 @@ public class DeletionService extends AbstractService {
try { try {
LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]"); LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
if (baseDirs == null || baseDirs.size() == 0) { if (baseDirs == null || baseDirs.size() == 0) {
delService.exec.deleteAsUser(user, subDir, (Path[])null); delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(subDir)
.build());
} else { } else {
delService.exec.deleteAsUser(user, subDir, delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
baseDirs.toArray(new Path[0])); .setUser(user)
.setSubDir(subDir)
.setBasedirs(baseDirs.toArray(new Path[0]))
.build());
} }
} catch (IOException e) { } catch (IOException e) {
error = true; error = true;

View File

@ -57,6 +57,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -123,10 +128,14 @@ public class DockerContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, public synchronized void startLocalizer(LocalizerStartContext ctx)
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs(); List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs(); List<String> logDirs = dirsHandler.getLogDirs();
@ -155,10 +164,15 @@ public class DockerContainerExecutor extends ContainerExecutor {
@Override @Override
public int launchContainer(Container container, Path public int launchContainer(ContainerStartContext ctx) throws IOException {
nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, Container container = ctx.getContainer();
String appId, Path containerWorkDir, List<String> localDirs, List<String> Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
logDirs) throws IOException { Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String userName = ctx.getUser();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
//Variables for the launch environment can be injected from the command-line //Variables for the launch environment can be injected from the command-line
//while submitting the application //while submitting the application
String containerImageName = container.getLaunchContext().getEnvironment() String containerImageName = container.getLaunchContext().getEnvironment()
@ -374,8 +388,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public boolean signalContainer(String user, String pid, Signal signal) public boolean signalContainer(ContainerSignalContext ctx)
throws IOException { throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
Signal signal = ctx.getSignal();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
+ " as user " + user); + " as user " + user);
@ -395,8 +413,10 @@ public class DockerContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public boolean isContainerProcessAlive(String user, String pid) public boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException { throws IOException {
String pid = ctx.getPid();
return containerIsAlive(pid); return containerIsAlive(pid);
} }
@ -433,9 +453,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public void deleteAsUser(String user, Path subDir, Path... baseDirs) public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) { Path subDir = ctx.getSubDir();
List<Path> baseDirs = ctx.getBasedirs();
if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + subDir); LOG.info("Deleting absolute path : " + subDir);
if (!lfs.delete(subDir, true)) { if (!lfs.delete(subDir, true)) {
//Maybe retry //Maybe retry

View File

@ -50,6 +50,12 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -214,11 +220,14 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public void startLocalizer(Path nmPrivateContainerTokensPath, public void startLocalizer(LocalizerStartContext ctx)
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs(); List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs(); List<String> logDirs = dirsHandler.getLogDirs();
@ -274,10 +283,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public int launchContainer(Container container, public int launchContainer(ContainerStartContext ctx) throws IOException {
Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath, Container container = ctx.getContainer();
String user, String appId, Path containerWorkDir, Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
List<String> localDirs, List<String> logDirs) throws IOException { Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String user = ctx.getUser();
String appId = ctx.getAppId();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
verifyUsernamePattern(user); verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user); String runAsUser = getRunAsUser(user);
@ -346,7 +360,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
containerExecutorExe, runAsUser, user, Integer containerExecutorExe, runAsUser, user, Integer
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId, .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(), containerIdStr, containerWorkDir.toString(),
nmPrivateCotainerScriptPath.toUri().getPath().toString(), nmPrivateContainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString(), nmPrivateTokensPath.toUri().getPath().toString(),
pidFilePath.toString(), pidFilePath.toString(),
StringUtils.join(",", localDirs), StringUtils.join(",", localDirs),
@ -423,8 +437,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public int reacquireContainer(String user, ContainerId containerId) public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException {
ContainerId containerId = ctx.getContainerId();
try { try {
//Resource handler chain needs to reacquire container state //Resource handler chain needs to reacquire container state
//as well //as well
@ -437,7 +453,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
} }
return super.reacquireContainer(user, containerId); return super.reacquireContainer(ctx);
} finally { } finally {
resourcesHandler.postExecute(containerId); resourcesHandler.postExecute(containerId);
if (resourceHandlerChain != null) { if (resourceHandlerChain != null) {
@ -452,8 +468,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public boolean signalContainer(String user, String pid, Signal signal) public boolean signalContainer(ContainerSignalContext ctx)
throws IOException { throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
Signal signal = ctx.getSignal();
verifyUsernamePattern(user); verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user); String runAsUser = getRunAsUser(user);
@ -487,7 +506,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public void deleteAsUser(String user, Path dir, Path... baseDirs) { public void deleteAsUser(DeletionAsUserContext ctx) {
String user = ctx.getUser();
Path dir = ctx.getSubDir();
List<Path> baseDirs = ctx.getBasedirs();
verifyUsernamePattern(user); verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user); String runAsUser = getRunAsUser(user);
@ -500,7 +523,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
Integer.toString(Commands.DELETE_AS_USER.getValue()), Integer.toString(Commands.DELETE_AS_USER.getValue()),
dirString)); dirString));
List<String> pathsToDelete = new ArrayList<String>(); List<String> pathsToDelete = new ArrayList<String>();
if (baseDirs == null || baseDirs.length == 0) { if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + dir); LOG.info("Deleting absolute path : " + dir);
pathsToDelete.add(dirString); pathsToDelete.add(dirString);
} else { } else {
@ -531,10 +554,17 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public boolean isContainerProcessAlive(String user, String pid) public boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException { throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
// Send a test signal to the process as the user to see if it's alive // Send a test signal to the process as the user to see if it's alive
return signalContainer(user, pid, Signal.NULL); return signalContainer(new ContainerSignalContext.Builder()
.setUser(user)
.setPid(pid)
.setSignal(Signal.NULL)
.build());
} }
public void mountCgroups(List<String> cgroupKVs, String hierarchy) public void mountCgroups(List<String> cgroupKVs, String hierarchy)

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
/** /**
* Windows secure container executor (WSCE). * Windows secure container executor (WSCE).
@ -643,92 +644,94 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
return dst; return dst;
} }
@Override @Override
public void startLocalizer(Path nmPrivateContainerTokens, public void startLocalizer(LocalizerStartContext ctx) throws IOException,
InetSocketAddress nmAddr, String user, String appId, String locId, InterruptedException {
LocalDirsHandlerService dirsHandler) throws IOException, Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InterruptedException { InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
List<String> localDirs = dirsHandler.getLocalDirs(); String appId = ctx.getAppId();
List<String> logDirs = dirsHandler.getLogDirs(); String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
Path classpathJarPrivateDir = dirsHandler.getLocalPathForWrite( List<String> localDirs = dirsHandler.getLocalDirs();
ResourceLocalizationService.NM_PRIVATE_DIR); List<String> logDirs = dirsHandler.getLogDirs();
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs, user);
Path appStorageDir = getWorkingDir(localDirs, user, appId); Path classpathJarPrivateDir = dirsHandler.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR);
String tokenFn = String.format( createUserLocalDirs(localDirs, user);
ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); createUserCacheDirs(localDirs, user);
Path tokenDst = new Path(appStorageDir, tokenFn); createAppDirs(localDirs, user, appId);
copyFile(nmPrivateContainerTokens, tokenDst, user); createAppLogDirs(appId, logDirs, user);
File cwdApp = new File(appStorageDir.toString()); Path appStorageDir = getWorkingDir(localDirs, user, appId);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("cwdApp: %s", cwdApp));
}
List<String> command ;
command = new ArrayList<String>(); String tokenFn = String.format(
ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
//use same jvm as parent File cwdApp = new File(appStorageDir.toString());
File jvm = new File( if (LOG.isDebugEnabled()) {
new File(System.getProperty("java.home"), "bin"), "java.exe"); LOG.debug(String.format("cwdApp: %s", cwdApp));
command.add(jvm.toString()); }
Path cwdPath = new Path(cwdApp.getPath());
// Build a temp classpath jar. See ContainerLaunch.sanitizeEnv().
// Passing CLASSPATH explicitly is *way* too long for command line.
String classPath = System.getProperty("java.class.path");
Map<String, String> env = new HashMap<String, String>(System.getenv());
String jarCp[] = FileUtil.createJarWithClassPath(classPath,
classpathJarPrivateDir, cwdPath, env);
String classPathJar = localizeClasspathJar(
new Path(jarCp[0]), cwdPath, user).toString();
command.add("-classpath");
command.add(classPathJar + jarCp[1]);
String javaLibPath = System.getProperty("java.library.path"); List<String> command ;
if (javaLibPath != null) {
command.add("-Djava.library.path=" + javaLibPath);
}
command.addAll(ContainerLocalizer.getJavaOpts(getConf()));
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr, command = new ArrayList<String>();
localDirs);
//use same jvm as parent
String cmdLine = StringUtils.join(command, " "); File jvm = new File(
new File(System.getProperty("java.home"), "bin"), "java.exe");
String localizerPid = String.format(LOCALIZER_PID_FORMAT, locId); command.add(jvm.toString());
WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor( Path cwdPath = new Path(cwdApp.getPath());
cwdApp.getAbsolutePath(),
localizerPid, user, "nul:", cmdLine); // Build a temp classpath jar. See ContainerLaunch.sanitizeEnv().
try { // Passing CLASSPATH explicitly is *way* too long for command line.
stubExecutor.execute(); String classPath = System.getProperty("java.class.path");
stubExecutor.validateResult(); Map<String, String> env = new HashMap<String, String>(System.getenv());
} String jarCp[] = FileUtil.createJarWithClassPath(classPath,
finally { classpathJarPrivateDir, cwdPath, env);
stubExecutor.close(); String classPathJar = localizeClasspathJar(
try new Path(jarCp[0]), cwdPath, user).toString();
{ command.add("-classpath");
killContainer(localizerPid, Signal.KILL); command.add(classPathJar + jarCp[1]);
}
catch(Throwable e) { String javaLibPath = System.getProperty("java.library.path");
LOG.warn(String.format( if (javaLibPath != null) {
"An exception occured during the cleanup of localizer job %s:%n%s", command.add("-Djava.library.path=" + javaLibPath);
localizerPid, }
org.apache.hadoop.util.StringUtils.stringifyException(e))); command.addAll(ContainerLocalizer.getJavaOpts(getConf()));
}
} ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
} localDirs);
@Override String cmdLine = StringUtils.join(command, " ");
String localizerPid = String.format(LOCALIZER_PID_FORMAT, locId);
WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor(
cwdApp.getAbsolutePath(),
localizerPid, user, "nul:", cmdLine);
try {
stubExecutor.execute();
stubExecutor.validateResult();
} finally {
stubExecutor.close();
try
{
killContainer(localizerPid, Signal.KILL);
}
catch(Throwable e) {
LOG.warn(String.format(
"An exception occured during the cleanup of localizer job %s:%n%s",
localizerPid,
org.apache.hadoop.util.StringUtils.stringifyException(e)));
}
}
}
@Override
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
String containerIdStr, String userName, Path pidFile, Resource resource, String containerIdStr, String userName, Path pidFile, Resource resource,
File wordDir, Map<String, String> environment) throws IOException { File wordDir, Map<String, String> environment) throws IOException {

View File

@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.WindowsSecureContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.WindowsSecureContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
@ -299,9 +301,16 @@ public class ContainerLaunch implements Callable<Integer> {
} }
else { else {
exec.activateContainer(containerID, pidFilePath); exec.activateContainer(containerID, pidFilePath);
ret = exec.launchContainer(container, nmPrivateContainerScriptPath, ret = exec.launchContainer(new ContainerStartContext.Builder()
nmPrivateTokensPath, user, appIdStr, containerWorkDir, .setContainer(container)
localDirs, logDirs); .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
.setNmPrivateTokensPath(nmPrivateTokensPath)
.setUser(user)
.setAppId(appIdStr)
.setContainerWorkDir(containerWorkDir)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.build());
} }
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed to launch container.", e); LOG.warn("Failed to launch container.", e);
@ -416,7 +425,12 @@ public class ContainerLaunch implements Callable<Integer> {
? Signal.TERM ? Signal.TERM
: Signal.KILL; : Signal.KILL;
boolean result = exec.signalContainer(user, processId, signal); boolean result = exec.signalContainer(
new ContainerSignalContext.Builder()
.setUser(user)
.setPid(processId)
.setSignal(signal)
.build());
LOG.debug("Sent signal " + signal + " to pid " + processId LOG.debug("Sent signal " + signal + " to pid " + processId
+ " as user " + user + " as user " + user

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
/** /**
@ -80,7 +81,11 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
String pidPathStr = pidFile.getPath(); String pidPathStr = pidFile.getPath();
pidFilePath = new Path(pidPathStr); pidFilePath = new Path(pidPathStr);
exec.activateContainer(containerId, pidFilePath); exec.activateContainer(containerId, pidFilePath);
retCode = exec.reacquireContainer(container.getUser(), containerId); retCode = exec.reacquireContainer(
new ContainerReacquisitionContext.Builder()
.setUser(container.getUser())
.setContainerId(containerId)
.build());
} else { } else {
LOG.warn("Unable to locate pid file for container " + containerIdStr); LOG.warn("Unable to locate pid file for container " + containerIdStr);
} }

View File

@ -124,6 +124,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@ -1135,13 +1136,15 @@ public class ResourceLocalizationService extends CompositeService
writeCredentials(nmPrivateCTokensPath); writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait // 2) exec initApplication and wait
if (dirsHandler.areDisksHealthy()) { if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, exec.startLocalizer(new LocalizerStartContext.Builder()
context.getUser(), .setNmPrivateContainerTokens(nmPrivateCTokensPath)
ConverterUtils.toString( .setNmAddr(localizationServerAddress)
context.getContainerId(). .setUser(context.getUser())
getApplicationAttemptId().getApplicationId()), .setAppId(ConverterUtils.toString(context.getContainerId()
localizerId, .getApplicationAttemptId().getApplicationId()))
dirsHandler); .setLocId(localizerId)
.setDirsHandler(dirsHandler)
.build());
} else { } else {
throw new IOException("All disks failed. " throw new IOException("All disks failed. "
+ dirsHandler.getDisksHealthReport(false)); + dirsHandler.getDisksHealthReport(false));

View File

@ -0,0 +1,70 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Encapsulates information required for container liveness checks.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerLivenessContext {
private final String user;
private final String pid;
public static final class Builder {
private String user;
private String pid;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setPid(String pid) {
this.pid = pid;
return this;
}
public ContainerLivenessContext build() {
return new ContainerLivenessContext(this);
}
}
private ContainerLivenessContext(Builder builder) {
this.user = builder.user;
this.pid = builder.pid;
}
public String getUser() {
return this.user;
}
public String getPid() {
return this.pid;
}
}

View File

@ -0,0 +1,71 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Encapsulates information required for container reacquisition.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerReacquisitionContext {
private final String user;
private final ContainerId containerId;
public static final class Builder {
private String user;
private ContainerId containerId;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setContainerId(ContainerId containerId) {
this.containerId = containerId;
return this;
}
public ContainerReacquisitionContext build() {
return new ContainerReacquisitionContext(this);
}
}
private ContainerReacquisitionContext(Builder builder) {
this.user = builder.user;
this.containerId = builder.containerId;
}
public String getUser() {
return this.user;
}
public ContainerId getContainerId() {
return this.containerId;
}
}

View File

@ -0,0 +1,83 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
/**
* Encapsulates information required for container signaling.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerSignalContext {
private final String user;
private final String pid;
private final Signal signal;
public static final class Builder {
private String user;
private String pid;
private Signal signal;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setPid(String pid) {
this.pid = pid;
return this;
}
public Builder setSignal(Signal signal) {
this.signal = signal;
return this;
}
public ContainerSignalContext build() {
return new ContainerSignalContext(this);
}
}
private ContainerSignalContext(Builder builder) {
this.user = builder.user;
this.pid = builder.pid;
this.signal = builder.signal;
}
public String getUser() {
return this.user;
}
public String getPid() {
return this.pid;
}
public Signal getSignal() {
return this.signal;
}
}

View File

@ -0,0 +1,147 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import java.util.List;
/**
* Encapsulates information required for starting/launching containers.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerStartContext {
private final Container container;
private final Path nmPrivateContainerScriptPath;
private final Path nmPrivateTokensPath;
private final String user;
private final String appId;
private final Path containerWorkDir;
private final List<String> localDirs;
private final List<String> logDirs;
public static final class Builder {
private Container container;
private Path nmPrivateContainerScriptPath;
private Path nmPrivateTokensPath;
private String user;
private String appId;
private Path containerWorkDir;
private List<String> localDirs;
private List<String> logDirs;
public Builder() {
}
public Builder setContainer(Container container) {
this.container = container;
return this;
}
public Builder setNmPrivateContainerScriptPath(
Path nmPrivateContainerScriptPath) {
this.nmPrivateContainerScriptPath = nmPrivateContainerScriptPath;
return this;
}
public Builder setNmPrivateTokensPath(Path nmPrivateTokensPath) {
this.nmPrivateTokensPath = nmPrivateTokensPath;
return this;
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setAppId(String appId) {
this.appId = appId;
return this;
}
public Builder setContainerWorkDir(Path containerWorkDir) {
this.containerWorkDir = containerWorkDir;
return this;
}
public Builder setLocalDirs(List<String> localDirs) {
this.localDirs = localDirs;
return this;
}
public Builder setLogDirs(List<String> logDirs) {
this.logDirs = logDirs;
return this;
}
public ContainerStartContext build() {
return new ContainerStartContext(this);
}
}
private ContainerStartContext(Builder builder) {
this.container = builder.container;
this.nmPrivateContainerScriptPath = builder.nmPrivateContainerScriptPath;
this.nmPrivateTokensPath = builder.nmPrivateTokensPath;
this.user = builder.user;
this.appId = builder.appId;
this.containerWorkDir = builder.containerWorkDir;
this.localDirs = builder.localDirs;
this.logDirs = builder.logDirs;
}
public Container getContainer() {
return this.container;
}
public Path getNmPrivateContainerScriptPath() {
return this.nmPrivateContainerScriptPath;
}
public Path getNmPrivateTokensPath() {
return this.nmPrivateTokensPath;
}
public String getUser() {
return this.user;
}
public String getAppId() {
return this.appId;
}
public Path getContainerWorkDir() {
return this.containerWorkDir;
}
public List<String> getLocalDirs() {
return this.localDirs;
}
public List<String> getLogDirs() {
return this.logDirs;
}
}

View File

@ -0,0 +1,91 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Encapsulates information required for deletions as a given user.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class DeletionAsUserContext {
private final String user;
private final Path subDir;
private final List<Path> basedirs;
public static final class Builder {
private String user;
private Path subDir;
private List<Path> basedirs;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setSubDir(Path subDir) {
this.subDir = subDir;
return this;
}
public Builder setBasedirs(Path... basedirs) {
this.basedirs = Arrays.asList(basedirs);
return this;
}
public DeletionAsUserContext build() {
return new DeletionAsUserContext(this);
}
}
private DeletionAsUserContext(Builder builder) {
this.user = builder.user;
this.subDir = builder.subDir;
this.basedirs = builder.basedirs;
}
public String getUser() {
return this.user;
}
public Path getSubDir() {
return this.subDir;
}
public List<Path> getBasedirs() {
if (this.basedirs != null) {
return Collections.unmodifiableList(this.basedirs);
} else {
return null;
}
}
}

View File

@ -0,0 +1,122 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import java.net.InetSocketAddress;
/**
* Encapsulates information required for starting a localizer.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class LocalizerStartContext {
private final Path nmPrivateContainerTokens;
private final InetSocketAddress nmAddr;
private final String user;
private final String appId;
private final String locId;
private final LocalDirsHandlerService dirsHandler;
public static final class Builder {
private Path nmPrivateContainerTokens;
private InetSocketAddress nmAddr;
private String user;
private String appId;
private String locId;
private LocalDirsHandlerService dirsHandler;
public Builder() {
}
public Builder setNmPrivateContainerTokens(Path nmPrivateContainerTokens) {
this.nmPrivateContainerTokens = nmPrivateContainerTokens;
return this;
}
public Builder setNmAddr(InetSocketAddress nmAddr) {
this.nmAddr = nmAddr;
return this;
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setAppId(String appId) {
this.appId = appId;
return this;
}
public Builder setLocId(String locId) {
this.locId = locId;
return this;
}
public Builder setDirsHandler(LocalDirsHandlerService dirsHandler) {
this.dirsHandler = dirsHandler;
return this;
}
public LocalizerStartContext build() {
return new LocalizerStartContext(this);
}
}
private LocalizerStartContext(Builder builder) {
this.nmPrivateContainerTokens = builder.nmPrivateContainerTokens;
this.nmAddr = builder.nmAddr;
this.user = builder.user;
this.appId = builder.appId;
this.locId = builder.locId;
this.dirsHandler = builder.dirsHandler;
}
public Path getNmPrivateContainerTokens() {
return this.nmPrivateContainerTokens;
}
public InetSocketAddress getNmAddr() {
return this.nmAddr;
}
public String getUser() {
return this.user;
}
public String getAppId() {
return this.appId;
}
public String getLocId() {
return this.locId;
}
public LocalDirsHandlerService getDirsHandler() {
return this.dirsHandler;
}
}

View File

@ -72,6 +72,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -309,13 +312,26 @@ public class TestDefaultContainerExecutor {
mockExec.init(); mockExec.init();
mockExec.activateContainer(cId, pidFile); mockExec.activateContainer(cId, pidFile);
int ret = mockExec int ret = mockExec.launchContainer(new ContainerStartContext.Builder()
.launchContainer(container, scriptPath, tokensPath, appSubmitter, .setContainer(container)
appId, workDir, localDirs, localDirs); .setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.build());
Assert.assertNotSame(0, ret); Assert.assertNotSame(0, ret);
} finally { } finally {
mockExec.deleteAsUser(appSubmitter, localDir); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
mockExec.deleteAsUser(appSubmitter, logDir); .setUser(appSubmitter)
.setSubDir(localDir)
.build());
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(logDir)
.build());
} }
} }
@ -410,14 +426,29 @@ public class TestDefaultContainerExecutor {
when(dirsHandler.getLogDirs()).thenReturn(logDirs); when(dirsHandler.getLogDirs()).thenReturn(logDirs);
try { try {
mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, mockExec.startLocalizer(new LocalizerStartContext.Builder()
appSubmitter, appId, locId, dirsHandler); .setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(localizationServerAddress)
.setUser(appSubmitter)
.setAppId(appId)
.setLocId(locId)
.setDirsHandler(dirsHandler)
.build());
} catch (IOException e) { } catch (IOException e) {
Assert.fail("StartLocalizer failed to copy token file " + e); Assert.fail("StartLocalizer failed to copy token file " + e);
} finally { } finally {
mockExec.deleteAsUser(appSubmitter, firstDir); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
mockExec.deleteAsUser(appSubmitter, secondDir); .setUser(appSubmitter)
mockExec.deleteAsUser(appSubmitter, logDir); .setSubDir(firstDir)
.build());
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(secondDir)
.build());
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(logDir)
.build());
deleteTmpFiles(); deleteTmpFiles();
} }
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
@ -80,14 +81,28 @@ public class TestDeletionService {
static class FakeDefaultContainerExecutor extends DefaultContainerExecutor { static class FakeDefaultContainerExecutor extends DefaultContainerExecutor {
@Override @Override
public void deleteAsUser(String user, Path subDir, Path... basedirs) public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException {
String user = ctx.getUser();
Path subDir = ctx.getSubDir();
List<Path> basedirs = ctx.getBasedirs();
if ((Long.parseLong(subDir.getName()) % 2) == 0) { if ((Long.parseLong(subDir.getName()) % 2) == 0) {
assertNull(user); assertNull(user);
} else { } else {
assertEquals("dingo", user); assertEquals("dingo", user);
} }
super.deleteAsUser(user, subDir, basedirs);
DeletionAsUserContext.Builder builder = new DeletionAsUserContext
.Builder()
.setUser(user)
.setSubDir(subDir);
if (basedirs != null) {
builder.setBasedirs(basedirs.toArray(new Path[basedirs.size()]));
}
super.deleteAsUser(builder.build());
assertFalse(lfs.util().exists(subDir)); assertFalse(lfs.util().exists(subDir));
} }
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -179,9 +180,16 @@ public class TestDockerContainerExecutor {
Path pidFile = new Path(workDir, "pid.txt"); Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile); exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath, return exec.launchContainer(new ContainerStartContext.Builder()
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), .setContainer(container)
dirsHandler.getLogDirs()); .setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
} }
// Write the script used to launch the docker container in a temp file // Write the script used to launch the docker container in a temp file

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -149,9 +150,16 @@ public class TestDockerContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt"); Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile); dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, dockerContainerExecutor.launchContainer(new ContainerStartContext.Builder()
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), .setContainer(container)
dirsHandler.getLogDirs()); .setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
@ -185,9 +193,17 @@ public class TestDockerContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt"); Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile); dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, dockerContainerExecutor.launchContainer(
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), new ContainerStartContext.Builder()
dirsHandler.getLogDirs()); .setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
} }
@Test @Test
@ -219,9 +235,17 @@ public class TestDockerContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid"); Path pidFile = new Path(workDir, "pid");
dockerContainerExecutor.activateContainer(cId, pidFile); dockerContainerExecutor.activateContainer(cId, pidFile);
int ret = dockerContainerExecutor.launchContainer(container, scriptPath, int ret = dockerContainerExecutor.launchContainer(
tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), new ContainerStartContext.Builder()
dirsHandler.getLogDirs()); .setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
assertEquals(0, ret); assertEquals(0, ret);
//get the script //get the script
Path sessionScriptPath = new Path(workDir, Path sessionScriptPath = new Path(workDir,

View File

@ -60,6 +60,11 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -208,7 +213,10 @@ public class TestLinuxContainerExecutor {
Path usercachedir = new Path(dir, ContainerLocalizer.USERCACHE); Path usercachedir = new Path(dir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usercachedir, user); Path userdir = new Path(usercachedir, user);
Path appcachedir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appcachedir = new Path(userdir, ContainerLocalizer.APPCACHE);
exec.deleteAsUser(user, appcachedir); exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(appcachedir)
.build());
FileContext.getLocalFSFileContext().delete(usercachedir, true); FileContext.getLocalFSFileContext().delete(usercachedir, true);
} }
} }
@ -218,7 +226,10 @@ public class TestLinuxContainerExecutor {
for (String dir : localDirs) { for (String dir : localDirs) {
Path filecache = new Path(dir, ContainerLocalizer.FILECACHE); Path filecache = new Path(dir, ContainerLocalizer.FILECACHE);
Path filedir = new Path(filecache, user); Path filedir = new Path(filecache, user);
exec.deleteAsUser(user, filedir); exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(filedir)
.build());
} }
} }
@ -229,7 +240,10 @@ public class TestLinuxContainerExecutor {
String containerId = "CONTAINER_" + (id - 1); String containerId = "CONTAINER_" + (id - 1);
Path appdir = new Path(dir, appId); Path appdir = new Path(dir, appId);
Path containerdir = new Path(appdir, containerId); Path containerdir = new Path(appdir, containerId);
exec.deleteAsUser(user, containerdir); exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(containerdir)
.build());
} }
} }
@ -244,7 +258,11 @@ public class TestLinuxContainerExecutor {
for (String file : files) { for (String file : files) {
File f = new File(workSpace, file); File f = new File(workSpace, file);
if (f.exists()) { if (f.exists()) {
exec.deleteAsUser(user, new Path(file), ws); exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(new Path(file))
.setBasedirs(ws)
.build());
} }
} }
} }
@ -310,9 +328,16 @@ public class TestLinuxContainerExecutor {
Path pidFile = new Path(workDir, "pid.txt"); Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile); exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath, return exec.launchContainer(new ContainerStartContext.Builder()
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), .setContainer(container)
dirsHandler.getLogDirs()); .setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
} }
@Test @Test
@ -345,8 +370,14 @@ public class TestLinuxContainerExecutor {
}; };
exec.setConf(conf); exec.setConf(conf);
exec.startLocalizer(nmPrivateContainerTokensPath, nmAddr, appSubmitter, exec.startLocalizer(new LocalizerStartContext.Builder()
appId, locId, dirsHandler); .setNmPrivateContainerTokens(nmPrivateContainerTokensPath)
.setNmAddr(nmAddr)
.setUser(appSubmitter)
.setAppId(appId)
.setLocId(locId)
.setDirsHandler(dirsHandler)
.build());
String locId2 = "container_01_02"; String locId2 = "container_01_02";
Path nmPrivateContainerTokensPath2 = Path nmPrivateContainerTokensPath2 =
@ -355,8 +386,16 @@ public class TestLinuxContainerExecutor {
+ Path.SEPARATOR + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId2)); + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId2));
files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE)); files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
exec.startLocalizer(nmPrivateContainerTokensPath2, nmAddr, appSubmitter, exec.startLocalizer(new LocalizerStartContext.Builder()
appId, locId2, dirsHandler); .setNmPrivateContainerTokens(nmPrivateContainerTokensPath2)
.setNmAddr(nmAddr)
.setUser(appSubmitter)
.setAppId(appId)
.setLocId(locId2)
.setDirsHandler(dirsHandler)
.build());
cleanupUserAppCache(appSubmitter); cleanupUserAppCache(appSubmitter);
} }
@ -429,7 +468,11 @@ public class TestLinuxContainerExecutor {
assertNotNull(pid); assertNotNull(pid);
LOG.info("Going to killing the process."); LOG.info("Going to killing the process.");
exec.signalContainer(appSubmitter, pid, Signal.TERM); exec.signalContainer(new ContainerSignalContext.Builder()
.setUser(appSubmitter)
.setPid(pid)
.setSignal(Signal.TERM)
.build());
LOG.info("sleeping for 100ms to let the sleep be killed"); LOG.info("sleeping for 100ms to let the sleep be killed");
Thread.sleep(100); Thread.sleep(100);
@ -586,7 +629,10 @@ public class TestLinuxContainerExecutor {
} catch (IOException e) { } catch (IOException e) {
// expected if LCE isn't setup right, but not necessary for this test // expected if LCE isn't setup right, but not necessary for this test
} }
lce.reacquireContainer("foouser", cid); lce.reacquireContainer(new ContainerReacquisitionContext.Builder()
.setUser("foouser")
.setContainerId(cid)
.build());
assertTrue("postExec not called after reacquisition", assertTrue("postExec not called after reacquisition",
TestResourceHandler.postExecContainers.contains(cid)); TestResourceHandler.postExecContainers.contains(cid));
} }

View File

@ -49,6 +49,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert; import org.junit.Assert;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -130,9 +134,16 @@ public class TestLinuxContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt"); Path pidFile = new Path(workDir, "pid.txt");
mockExec.activateContainer(cId, pidFile); mockExec.activateContainer(cId, pidFile);
int ret = mockExec.launchContainer(container, scriptPath, tokensPath, int ret = mockExec.launchContainer(new ContainerStartContext.Builder()
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), .setContainer(container)
dirsHandler.getLogDirs()); .setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
assertEquals(0, ret); assertEquals(0, ret);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, appId, containerId, appSubmitter, cmd, appId, containerId,
@ -185,7 +196,15 @@ public class TestLinuxContainerExecutorWithMocks {
Path nmPrivateCTokensPath= new Path("file:///bin/nmPrivateCTokensPath"); Path nmPrivateCTokensPath= new Path("file:///bin/nmPrivateCTokensPath");
try { try {
mockExec.startLocalizer(nmPrivateCTokensPath, address, "test", "application_0", "12345", dirsHandler); mockExec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(address)
.setUser("test")
.setAppId("application_0")
.setLocId("12345")
.setDirsHandler(dirsHandler)
.build());
List<String> result=readMockParams(); List<String> result=readMockParams();
Assert.assertEquals(result.size(), 18); Assert.assertEquals(result.size(), 18);
Assert.assertEquals(result.get(0), YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER); Assert.assertEquals(result.get(0), YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
@ -278,9 +297,17 @@ public class TestLinuxContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt"); Path pidFile = new Path(workDir, "pid.txt");
mockExec.activateContainer(cId, pidFile); mockExec.activateContainer(cId, pidFile);
int ret = mockExec.launchContainer(container, scriptPath, tokensPath, int ret = mockExec.launchContainer(new ContainerStartContext.Builder()
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), .setContainer(container)
dirsHandler.getLogDirs()); .setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
Assert.assertNotSame(0, ret); Assert.assertNotSame(0, ret);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, appId, containerId, appSubmitter, cmd, appId, containerId,
@ -308,7 +335,11 @@ public class TestLinuxContainerExecutorWithMocks {
ContainerExecutor.Signal signal = ContainerExecutor.Signal.QUIT; ContainerExecutor.Signal signal = ContainerExecutor.Signal.QUIT;
String sigVal = String.valueOf(signal.getValue()); String sigVal = String.valueOf(signal.getValue());
mockExec.signalContainer(appSubmitter, "1000", signal); mockExec.signalContainer(new ContainerSignalContext.Builder()
.setUser(appSubmitter)
.setPid("1000")
.setSignal(signal)
.build());
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "1000", sigVal), appSubmitter, cmd, "1000", sigVal),
readMockParams()); readMockParams());
@ -324,24 +355,41 @@ public class TestLinuxContainerExecutorWithMocks {
Path baseDir0 = new Path("/grid/0/BaseDir"); Path baseDir0 = new Path("/grid/0/BaseDir");
Path baseDir1 = new Path("/grid/1/BaseDir"); Path baseDir1 = new Path("/grid/1/BaseDir");
mockExec.deleteAsUser(appSubmitter, dir); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, .setUser(appSubmitter)
appSubmitter, cmd, "/tmp/testdir"), .setSubDir(dir)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "/tmp/testdir"),
readMockParams()); readMockParams());
mockExec.deleteAsUser(appSubmitter, null); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, .setUser(appSubmitter)
appSubmitter, cmd, ""), .build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, ""),
readMockParams()); readMockParams());
mockExec.deleteAsUser(appSubmitter, testFile, baseDir0, baseDir1); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, .setUser(appSubmitter)
appSubmitter, cmd, testFile.toString(), baseDir0.toString(), baseDir1.toString()), .setSubDir(testFile)
.setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, testFile.toString(), baseDir0.toString(),
baseDir1.toString()),
readMockParams()); readMockParams());
mockExec.deleteAsUser(appSubmitter, null, baseDir0, baseDir1); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, .setUser(appSubmitter)
appSubmitter, cmd, "", baseDir0.toString(), baseDir1.toString()), .setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "", baseDir0.toString(), baseDir1.toString()),
readMockParams()); readMockParams());
File f = new File("./src/test/resources/mock-container-executer-with-error"); File f = new File("./src/test/resources/mock-container-executer-with-error");
@ -353,22 +401,38 @@ public class TestLinuxContainerExecutorWithMocks {
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath); conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
mockExec.setConf(conf); mockExec.setConf(conf);
mockExec.deleteAsUser(appSubmitter, dir); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, .setUser(appSubmitter)
appSubmitter, cmd, "/tmp/testdir"), .setSubDir(dir)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "/tmp/testdir"),
readMockParams()); readMockParams());
mockExec.deleteAsUser(appSubmitter, null); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, .setUser(appSubmitter)
appSubmitter, cmd, ""), .build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, ""),
readMockParams()); readMockParams());
mockExec.deleteAsUser(appSubmitter, testFile, baseDir0, baseDir1); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, .setUser(appSubmitter)
appSubmitter, cmd, testFile.toString(), baseDir0.toString(), baseDir1.toString()), .setSubDir(testFile)
.setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, testFile.toString(), baseDir0.toString(),
baseDir1.toString()),
readMockParams()); readMockParams());
mockExec.deleteAsUser(appSubmitter, null, baseDir0, baseDir1); mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "", baseDir0.toString(), baseDir1.toString()), appSubmitter, cmd, "", baseDir0.toString(), baseDir1.toString()),
readMockParams()); readMockParams());

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -255,8 +256,11 @@ public abstract class BaseContainerManagerTest {
if (containerManager != null) { if (containerManager != null) {
containerManager.stop(); containerManager.stop();
} }
createContainerExecutor().deleteAsUser(user, createContainerExecutor().deleteAsUser(new DeletionAsUserContext.Builder()
new Path(localDir.getAbsolutePath()), new Path[] {}); .setUser(user)
.setSubDir(new Path(localDir.getAbsolutePath()))
.setBasedirs(new Path[] {})
.build());
} }
public static void waitForContainerState(ContainerManagementProtocol containerManager, public static void waitForContainerState(ContainerManagementProtocol containerManager,

View File

@ -63,6 +63,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -939,11 +940,16 @@ public class TestResourceLocalizationService {
dispatcher.await(); dispatcher.await();
String appStr = ConverterUtils.toString(appId); String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerId().toString(); String ctnrStr = c.getContainerId().toString();
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class); ArgumentCaptor<LocalizerStartContext> contextCaptor = ArgumentCaptor
verify(exec).startLocalizer(tokenPathCaptor.capture(), .forClass(LocalizerStartContext.class);
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr), verify(exec).startLocalizer(contextCaptor.capture());
isA(LocalDirsHandlerService.class));
Path localizationTokenPath = tokenPathCaptor.getValue(); LocalizerStartContext context = contextCaptor.getValue();
Path localizationTokenPath = context.getNmPrivateContainerTokens();
assertEquals("user0", context.getUser());
assertEquals(appStr, context.getAppId());
assertEquals(ctnrStr, context.getLocId());
// heartbeat from localizer // heartbeat from localizer
LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class); LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Tes
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -165,8 +166,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
@Override @Override
public void tearDown() throws IOException, InterruptedException { public void tearDown() throws IOException, InterruptedException {
super.tearDown(); super.tearDown();
createContainerExecutor().deleteAsUser(user, createContainerExecutor().deleteAsUser(new DeletionAsUserContext.Builder()
new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {}); .setUser(user)
.setSubDir(new Path(remoteRootLogDir.getAbsolutePath()))
.setBasedirs(new Path[] {})
.build());
dispatcher.await(); dispatcher.await();
dispatcher.stop(); dispatcher.stop();
dispatcher.close(); dispatcher.close();

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@ -288,8 +289,11 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
// Assert that the process is not alive anymore // Assert that the process is not alive anymore
Assert.assertFalse("Process is still alive!", Assert.assertFalse("Process is still alive!",
exec.signalContainer(user, exec.signalContainer(new ContainerSignalContext.Builder()
pid, Signal.NULL)); .setUser(user)
.setPid(pid)
.setSignal(Signal.NULL)
.build()));
} }
@Test(timeout = 20000) @Test(timeout = 20000)