YARN-3684. Changed ContainerExecutor's primary lifecycle methods to use a more extensible mechanism of context objects. Contributed by Sidharta Seethana.

(cherry picked from commit 53fafcf061)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-05-21 15:50:23 -07:00
parent e8ac88d4fe
commit 673bd9708f
26 changed files with 1163 additions and 242 deletions

View File

@ -208,6 +208,9 @@ Release 2.8.0 - UNRELEASED
YARN-3583. Support of NodeLabel object instead of plain String
in YarnClient side. (Sunil G via wangda)
YARN-3684. Changed ContainerExecutor's primary lifecycle methods to use a more
extensible mechanism of context objects. (Sidharta Seethana via vinodkv)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -46,6 +45,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@ -111,61 +116,67 @@ public abstract class ContainerExecutor implements Configurable {
* For $rsrc in job resources
* Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef
* </pre>
* @param user user name of application owner
* @param appId id of the application
* @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
* @param nmAddr RPC address to contact NM
* @param dirsHandler NM local dirs service, for nm-local-dirs and nm-log-dirs
* @param ctx LocalizerStartContext that encapsulates necessary information
* for starting a localizer.
* @throws IOException For most application init failures
* @throws InterruptedException If application init thread is halted by NM
*/
public abstract void startLocalizer(Path nmPrivateContainerTokens,
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
public abstract void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException;
/**
* Launch the container on the node. This is a blocking call and returns only
* when the container exits.
* @param container the container to be launched
* @param nmPrivateContainerScriptPath the path for launch script
* @param nmPrivateTokensPath the path for tokens for the container
* @param user the user of the container
* @param appId the appId of the container
* @param containerWorkDir the work dir for the container
* @param localDirs nm-local-dirs to be used for this container
* @param logDirs nm-log-dirs to be used for this container
* @param ctx Encapsulates information necessary for launching containers.
* @return the return status of the launch
* @throws IOException
*/
public abstract int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
String user, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException;
public abstract int launchContainer(ContainerStartContext ctx) throws
IOException;
public abstract boolean signalContainer(String user, String pid,
Signal signal)
/**
* Signal container with the specified signal.
* @param ctx Encapsulates information necessary for signaling containers.
* @return returns true if the operation succeeded
* @throws IOException
*/
public abstract boolean signalContainer(ContainerSignalContext ctx)
throws IOException;
public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
/**
* Delete specified directories as a given user.
* @param ctx Encapsulates information necessary for deletion.
* @throws IOException
* @throws InterruptedException
*/
public abstract void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException;
public abstract boolean isContainerProcessAlive(String user, String pid)
/**
* Check if a container is alive.
* @param ctx Encapsulates information necessary for container liveness check.
* @return true if container is still alive
* @throws IOException
*/
public abstract boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException;
/**
* Recover an already existing container. This is a blocking call and returns
* only when the container exits. Note that the container must have been
* activated prior to this call.
* @param user the user of the container
* @param containerId The ID of the container to reacquire
* @param ctx encapsulates information necessary to reacquire container
* @return The exit code of the pre-existing container
* @throws IOException
* @throws InterruptedException
*/
public int reacquireContainer(String user, ContainerId containerId)
public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException {
String user = ctx.getUser();
ContainerId containerId = ctx.getContainerId();
Path pidPath = getPidFilePath(containerId);
if (pidPath == null) {
LOG.warn(containerId + " is not active, returning terminated error");
@ -179,7 +190,12 @@ public abstract class ContainerExecutor implements Configurable {
}
LOG.info("Reacquiring " + containerId + " with pid " + pid);
while(isContainerProcessAlive(user, pid)) {
ContainerLivenessContext livenessContext = new ContainerLivenessContext
.Builder()
.setUser(user)
.setPid(pid)
.build();
while(isContainerProcessAlive(livenessContext)) {
Thread.sleep(1000);
}
@ -486,7 +502,11 @@ public abstract class ContainerExecutor implements Configurable {
public void run() {
try {
Thread.sleep(delay);
containerExecutor.signalContainer(user, pid, signal);
containerExecutor.signalContainer(new ContainerSignalContext.Builder()
.setUser(user)
.setPid(pid)
.setSignal(signal)
.build());
} catch (InterruptedException e) {
return;
} catch (IOException e) {

View File

@ -55,6 +55,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -94,10 +99,14 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
@Override
public void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
public void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
@ -130,11 +139,15 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
@Override
public int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
String user, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException {
public int launchContainer(ContainerStartContext ctx) throws IOException {
Container container = ctx.getContainer();
Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String user = ctx.getUser();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerId();
@ -394,8 +407,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
@Override
public boolean signalContainer(String user, String pid, Signal signal)
public boolean signalContainer(ContainerSignalContext ctx)
throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
Signal signal = ctx.getSignal();
LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
+ " as user " + user);
if (!containerIsAlive(pid)) {
@ -413,8 +430,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
@Override
public boolean isContainerProcessAlive(String user, String pid)
public boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException {
String pid = ctx.getPid();
return containerIsAlive(pid);
}
@ -451,9 +470,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
@Override
public void deleteAsUser(String user, Path subDir, Path... baseDirs)
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) {
Path subDir = ctx.getSubDir();
List<Path> baseDirs = ctx.getBasedirs();
if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + subDir);
if (!lfs.delete(subDir, true)) {
//Maybe retry

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
@ -290,10 +291,16 @@ public class DeletionService extends AbstractService {
try {
LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
if (baseDirs == null || baseDirs.size() == 0) {
delService.exec.deleteAsUser(user, subDir, (Path[])null);
delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(subDir)
.build());
} else {
delService.exec.deleteAsUser(user, subDir,
baseDirs.toArray(new Path[0]));
delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(subDir)
.setBasedirs(baseDirs.toArray(new Path[0]))
.build());
}
} catch (IOException e) {
error = true;

View File

@ -57,6 +57,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -123,10 +128,14 @@ public class DockerContainerExecutor extends ContainerExecutor {
}
@Override
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
public synchronized void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
@ -155,10 +164,15 @@ public class DockerContainerExecutor extends ContainerExecutor {
@Override
public int launchContainer(Container container, Path
nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName,
String appId, Path containerWorkDir, List<String> localDirs, List<String>
logDirs) throws IOException {
public int launchContainer(ContainerStartContext ctx) throws IOException {
Container container = ctx.getContainer();
Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String userName = ctx.getUser();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
//Variables for the launch environment can be injected from the command-line
//while submitting the application
String containerImageName = container.getLaunchContext().getEnvironment()
@ -374,8 +388,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
}
@Override
public boolean signalContainer(String user, String pid, Signal signal)
public boolean signalContainer(ContainerSignalContext ctx)
throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
Signal signal = ctx.getSignal();
if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
+ " as user " + user);
@ -395,8 +413,10 @@ public class DockerContainerExecutor extends ContainerExecutor {
}
@Override
public boolean isContainerProcessAlive(String user, String pid)
public boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException {
String pid = ctx.getPid();
return containerIsAlive(pid);
}
@ -433,9 +453,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
}
@Override
public void deleteAsUser(String user, Path subDir, Path... baseDirs)
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) {
Path subDir = ctx.getSubDir();
List<Path> baseDirs = ctx.getBasedirs();
if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + subDir);
if (!lfs.delete(subDir, true)) {
//Maybe retry

View File

@ -50,6 +50,12 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -216,11 +222,14 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
@Override
public void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
public void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
@ -276,10 +285,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
@Override
public int launchContainer(Container container,
Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
String user, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException {
public int launchContainer(ContainerStartContext ctx) throws IOException {
Container container = ctx.getContainer();
Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String user = ctx.getUser();
String appId = ctx.getAppId();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user);
@ -348,7 +362,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
containerExecutorExe, runAsUser, user, Integer
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(),
nmPrivateCotainerScriptPath.toUri().getPath().toString(),
nmPrivateContainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString(),
pidFilePath.toString(),
StringUtils.join(",", localDirs),
@ -425,8 +439,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
@Override
public int reacquireContainer(String user, ContainerId containerId)
public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException {
ContainerId containerId = ctx.getContainerId();
try {
//Resource handler chain needs to reacquire container state
//as well
@ -439,7 +455,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
}
return super.reacquireContainer(user, containerId);
return super.reacquireContainer(ctx);
} finally {
resourcesHandler.postExecute(containerId);
if (resourceHandlerChain != null) {
@ -454,8 +470,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
@Override
public boolean signalContainer(String user, String pid, Signal signal)
public boolean signalContainer(ContainerSignalContext ctx)
throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
Signal signal = ctx.getSignal();
verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user);
@ -489,7 +508,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
@Override
public void deleteAsUser(String user, Path dir, Path... baseDirs) {
public void deleteAsUser(DeletionAsUserContext ctx) {
String user = ctx.getUser();
Path dir = ctx.getSubDir();
List<Path> baseDirs = ctx.getBasedirs();
verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user);
@ -502,7 +525,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
Integer.toString(Commands.DELETE_AS_USER.getValue()),
dirString));
List<String> pathsToDelete = new ArrayList<String>();
if (baseDirs == null || baseDirs.length == 0) {
if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + dir);
pathsToDelete.add(dirString);
} else {
@ -533,10 +556,17 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
@Override
public boolean isContainerProcessAlive(String user, String pid)
public boolean isContainerProcessAlive(ContainerLivenessContext ctx)
throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
// Send a test signal to the process as the user to see if it's alive
return signalContainer(user, pid, Signal.NULL);
return signalContainer(new ContainerSignalContext.Builder()
.setUser(user)
.setPid(pid)
.setSignal(Signal.NULL)
.build());
}
public void mountCgroups(List<String> cgroupKVs, String hierarchy)

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
/**
* Windows secure container executor (WSCE).
@ -643,92 +644,94 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
return dst;
}
@Override
public void startLocalizer(Path nmPrivateContainerTokens,
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler) throws IOException,
InterruptedException {
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
Path classpathJarPrivateDir = dirsHandler.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR);
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs, user);
@Override
public void startLocalizer(LocalizerStartContext ctx) throws IOException,
InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn = String.format(
ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokens, tokenDst, user);
Path classpathJarPrivateDir = dirsHandler.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR);
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs, user);
File cwdApp = new File(appStorageDir.toString());
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("cwdApp: %s", cwdApp));
}
List<String> command ;
Path appStorageDir = getWorkingDir(localDirs, user, appId);
command = new ArrayList<String>();
String tokenFn = String.format(
ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
//use same jvm as parent
File jvm = new File(
new File(System.getProperty("java.home"), "bin"), "java.exe");
command.add(jvm.toString());
Path cwdPath = new Path(cwdApp.getPath());
// Build a temp classpath jar. See ContainerLaunch.sanitizeEnv().
// Passing CLASSPATH explicitly is *way* too long for command line.
String classPath = System.getProperty("java.class.path");
Map<String, String> env = new HashMap<String, String>(System.getenv());
String jarCp[] = FileUtil.createJarWithClassPath(classPath,
classpathJarPrivateDir, cwdPath, env);
String classPathJar = localizeClasspathJar(
new Path(jarCp[0]), cwdPath, user).toString();
command.add("-classpath");
command.add(classPathJar + jarCp[1]);
File cwdApp = new File(appStorageDir.toString());
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("cwdApp: %s", cwdApp));
}
String javaLibPath = System.getProperty("java.library.path");
if (javaLibPath != null) {
command.add("-Djava.library.path=" + javaLibPath);
}
command.addAll(ContainerLocalizer.getJavaOpts(getConf()));
List<String> command ;
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
localDirs);
String cmdLine = StringUtils.join(command, " ");
String localizerPid = String.format(LOCALIZER_PID_FORMAT, locId);
WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor(
cwdApp.getAbsolutePath(),
localizerPid, user, "nul:", cmdLine);
try {
stubExecutor.execute();
stubExecutor.validateResult();
}
finally {
stubExecutor.close();
try
{
killContainer(localizerPid, Signal.KILL);
}
catch(Throwable e) {
LOG.warn(String.format(
"An exception occured during the cleanup of localizer job %s:%n%s",
localizerPid,
org.apache.hadoop.util.StringUtils.stringifyException(e)));
}
}
}
@Override
command = new ArrayList<String>();
//use same jvm as parent
File jvm = new File(
new File(System.getProperty("java.home"), "bin"), "java.exe");
command.add(jvm.toString());
Path cwdPath = new Path(cwdApp.getPath());
// Build a temp classpath jar. See ContainerLaunch.sanitizeEnv().
// Passing CLASSPATH explicitly is *way* too long for command line.
String classPath = System.getProperty("java.class.path");
Map<String, String> env = new HashMap<String, String>(System.getenv());
String jarCp[] = FileUtil.createJarWithClassPath(classPath,
classpathJarPrivateDir, cwdPath, env);
String classPathJar = localizeClasspathJar(
new Path(jarCp[0]), cwdPath, user).toString();
command.add("-classpath");
command.add(classPathJar + jarCp[1]);
String javaLibPath = System.getProperty("java.library.path");
if (javaLibPath != null) {
command.add("-Djava.library.path=" + javaLibPath);
}
command.addAll(ContainerLocalizer.getJavaOpts(getConf()));
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
localDirs);
String cmdLine = StringUtils.join(command, " ");
String localizerPid = String.format(LOCALIZER_PID_FORMAT, locId);
WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor(
cwdApp.getAbsolutePath(),
localizerPid, user, "nul:", cmdLine);
try {
stubExecutor.execute();
stubExecutor.validateResult();
} finally {
stubExecutor.close();
try
{
killContainer(localizerPid, Signal.KILL);
}
catch(Throwable e) {
LOG.warn(String.format(
"An exception occured during the cleanup of localizer job %s:%n%s",
localizerPid,
org.apache.hadoop.util.StringUtils.stringifyException(e)));
}
}
}
@Override
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
String containerIdStr, String userName, Path pidFile, Resource resource,
File wordDir, Map<String, String> environment) throws IOException {

View File

@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.WindowsSecureContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
@ -299,9 +301,16 @@ public class ContainerLaunch implements Callable<Integer> {
}
else {
exec.activateContainer(containerID, pidFilePath);
ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir,
localDirs, logDirs);
ret = exec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
.setNmPrivateTokensPath(nmPrivateTokensPath)
.setUser(user)
.setAppId(appIdStr)
.setContainerWorkDir(containerWorkDir)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.build());
}
} catch (Throwable e) {
LOG.warn("Failed to launch container.", e);
@ -416,7 +425,12 @@ public class ContainerLaunch implements Callable<Integer> {
? Signal.TERM
: Signal.KILL;
boolean result = exec.signalContainer(user, processId, signal);
boolean result = exec.signalContainer(
new ContainerSignalContext.Builder()
.setUser(user)
.setPid(processId)
.setSignal(signal)
.build());
LOG.debug("Sent signal " + signal + " to pid " + processId
+ " as user " + user

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
@ -80,7 +81,11 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
String pidPathStr = pidFile.getPath();
pidFilePath = new Path(pidPathStr);
exec.activateContainer(containerId, pidFilePath);
retCode = exec.reacquireContainer(container.getUser(), containerId);
retCode = exec.reacquireContainer(
new ContainerReacquisitionContext.Builder()
.setUser(container.getUser())
.setContainerId(containerId)
.build());
} else {
LOG.warn("Unable to locate pid file for container " + containerIdStr);
}

View File

@ -124,6 +124,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@ -1135,13 +1136,15 @@ public class ResourceLocalizationService extends CompositeService
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
ConverterUtils.toString(
context.getContainerId().
getApplicationAttemptId().getApplicationId()),
localizerId,
dirsHandler);
exec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(localizationServerAddress)
.setUser(context.getUser())
.setAppId(ConverterUtils.toString(context.getContainerId()
.getApplicationAttemptId().getApplicationId()))
.setLocId(localizerId)
.setDirsHandler(dirsHandler)
.build());
} else {
throw new IOException("All disks failed. "
+ dirsHandler.getDisksHealthReport(false));

View File

@ -0,0 +1,70 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Encapsulates information required for container liveness checks.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerLivenessContext {
private final String user;
private final String pid;
public static final class Builder {
private String user;
private String pid;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setPid(String pid) {
this.pid = pid;
return this;
}
public ContainerLivenessContext build() {
return new ContainerLivenessContext(this);
}
}
private ContainerLivenessContext(Builder builder) {
this.user = builder.user;
this.pid = builder.pid;
}
public String getUser() {
return this.user;
}
public String getPid() {
return this.pid;
}
}

View File

@ -0,0 +1,71 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Encapsulates information required for container reacquisition.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerReacquisitionContext {
private final String user;
private final ContainerId containerId;
public static final class Builder {
private String user;
private ContainerId containerId;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setContainerId(ContainerId containerId) {
this.containerId = containerId;
return this;
}
public ContainerReacquisitionContext build() {
return new ContainerReacquisitionContext(this);
}
}
private ContainerReacquisitionContext(Builder builder) {
this.user = builder.user;
this.containerId = builder.containerId;
}
public String getUser() {
return this.user;
}
public ContainerId getContainerId() {
return this.containerId;
}
}

View File

@ -0,0 +1,83 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
/**
* Encapsulates information required for container signaling.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerSignalContext {
private final String user;
private final String pid;
private final Signal signal;
public static final class Builder {
private String user;
private String pid;
private Signal signal;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setPid(String pid) {
this.pid = pid;
return this;
}
public Builder setSignal(Signal signal) {
this.signal = signal;
return this;
}
public ContainerSignalContext build() {
return new ContainerSignalContext(this);
}
}
private ContainerSignalContext(Builder builder) {
this.user = builder.user;
this.pid = builder.pid;
this.signal = builder.signal;
}
public String getUser() {
return this.user;
}
public String getPid() {
return this.pid;
}
public Signal getSignal() {
return this.signal;
}
}

View File

@ -0,0 +1,147 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import java.util.List;
/**
* Encapsulates information required for starting/launching containers.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ContainerStartContext {
private final Container container;
private final Path nmPrivateContainerScriptPath;
private final Path nmPrivateTokensPath;
private final String user;
private final String appId;
private final Path containerWorkDir;
private final List<String> localDirs;
private final List<String> logDirs;
public static final class Builder {
private Container container;
private Path nmPrivateContainerScriptPath;
private Path nmPrivateTokensPath;
private String user;
private String appId;
private Path containerWorkDir;
private List<String> localDirs;
private List<String> logDirs;
public Builder() {
}
public Builder setContainer(Container container) {
this.container = container;
return this;
}
public Builder setNmPrivateContainerScriptPath(
Path nmPrivateContainerScriptPath) {
this.nmPrivateContainerScriptPath = nmPrivateContainerScriptPath;
return this;
}
public Builder setNmPrivateTokensPath(Path nmPrivateTokensPath) {
this.nmPrivateTokensPath = nmPrivateTokensPath;
return this;
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setAppId(String appId) {
this.appId = appId;
return this;
}
public Builder setContainerWorkDir(Path containerWorkDir) {
this.containerWorkDir = containerWorkDir;
return this;
}
public Builder setLocalDirs(List<String> localDirs) {
this.localDirs = localDirs;
return this;
}
public Builder setLogDirs(List<String> logDirs) {
this.logDirs = logDirs;
return this;
}
public ContainerStartContext build() {
return new ContainerStartContext(this);
}
}
private ContainerStartContext(Builder builder) {
this.container = builder.container;
this.nmPrivateContainerScriptPath = builder.nmPrivateContainerScriptPath;
this.nmPrivateTokensPath = builder.nmPrivateTokensPath;
this.user = builder.user;
this.appId = builder.appId;
this.containerWorkDir = builder.containerWorkDir;
this.localDirs = builder.localDirs;
this.logDirs = builder.logDirs;
}
public Container getContainer() {
return this.container;
}
public Path getNmPrivateContainerScriptPath() {
return this.nmPrivateContainerScriptPath;
}
public Path getNmPrivateTokensPath() {
return this.nmPrivateTokensPath;
}
public String getUser() {
return this.user;
}
public String getAppId() {
return this.appId;
}
public Path getContainerWorkDir() {
return this.containerWorkDir;
}
public List<String> getLocalDirs() {
return this.localDirs;
}
public List<String> getLogDirs() {
return this.logDirs;
}
}

View File

@ -0,0 +1,91 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Encapsulates information required for deletions as a given user.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class DeletionAsUserContext {
private final String user;
private final Path subDir;
private final List<Path> basedirs;
public static final class Builder {
private String user;
private Path subDir;
private List<Path> basedirs;
public Builder() {
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setSubDir(Path subDir) {
this.subDir = subDir;
return this;
}
public Builder setBasedirs(Path... basedirs) {
this.basedirs = Arrays.asList(basedirs);
return this;
}
public DeletionAsUserContext build() {
return new DeletionAsUserContext(this);
}
}
private DeletionAsUserContext(Builder builder) {
this.user = builder.user;
this.subDir = builder.subDir;
this.basedirs = builder.basedirs;
}
public String getUser() {
return this.user;
}
public Path getSubDir() {
return this.subDir;
}
public List<Path> getBasedirs() {
if (this.basedirs != null) {
return Collections.unmodifiableList(this.basedirs);
} else {
return null;
}
}
}

View File

@ -0,0 +1,122 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import java.net.InetSocketAddress;
/**
* Encapsulates information required for starting a localizer.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class LocalizerStartContext {
private final Path nmPrivateContainerTokens;
private final InetSocketAddress nmAddr;
private final String user;
private final String appId;
private final String locId;
private final LocalDirsHandlerService dirsHandler;
public static final class Builder {
private Path nmPrivateContainerTokens;
private InetSocketAddress nmAddr;
private String user;
private String appId;
private String locId;
private LocalDirsHandlerService dirsHandler;
public Builder() {
}
public Builder setNmPrivateContainerTokens(Path nmPrivateContainerTokens) {
this.nmPrivateContainerTokens = nmPrivateContainerTokens;
return this;
}
public Builder setNmAddr(InetSocketAddress nmAddr) {
this.nmAddr = nmAddr;
return this;
}
public Builder setUser(String user) {
this.user = user;
return this;
}
public Builder setAppId(String appId) {
this.appId = appId;
return this;
}
public Builder setLocId(String locId) {
this.locId = locId;
return this;
}
public Builder setDirsHandler(LocalDirsHandlerService dirsHandler) {
this.dirsHandler = dirsHandler;
return this;
}
public LocalizerStartContext build() {
return new LocalizerStartContext(this);
}
}
private LocalizerStartContext(Builder builder) {
this.nmPrivateContainerTokens = builder.nmPrivateContainerTokens;
this.nmAddr = builder.nmAddr;
this.user = builder.user;
this.appId = builder.appId;
this.locId = builder.locId;
this.dirsHandler = builder.dirsHandler;
}
public Path getNmPrivateContainerTokens() {
return this.nmPrivateContainerTokens;
}
public InetSocketAddress getNmAddr() {
return this.nmAddr;
}
public String getUser() {
return this.user;
}
public String getAppId() {
return this.appId;
}
public String getLocId() {
return this.locId;
}
public LocalDirsHandlerService getDirsHandler() {
return this.dirsHandler;
}
}

View File

@ -72,6 +72,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -309,13 +312,26 @@ public class TestDefaultContainerExecutor {
mockExec.init();
mockExec.activateContainer(cId, pidFile);
int ret = mockExec
.launchContainer(container, scriptPath, tokensPath, appSubmitter,
appId, workDir, localDirs, localDirs);
int ret = mockExec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.build());
Assert.assertNotSame(0, ret);
} finally {
mockExec.deleteAsUser(appSubmitter, localDir);
mockExec.deleteAsUser(appSubmitter, logDir);
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(localDir)
.build());
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(logDir)
.build());
}
}
@ -410,14 +426,29 @@ public class TestDefaultContainerExecutor {
when(dirsHandler.getLogDirs()).thenReturn(logDirs);
try {
mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
appSubmitter, appId, locId, dirsHandler);
mockExec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(localizationServerAddress)
.setUser(appSubmitter)
.setAppId(appId)
.setLocId(locId)
.setDirsHandler(dirsHandler)
.build());
} catch (IOException e) {
Assert.fail("StartLocalizer failed to copy token file " + e);
} finally {
mockExec.deleteAsUser(appSubmitter, firstDir);
mockExec.deleteAsUser(appSubmitter, secondDir);
mockExec.deleteAsUser(appSubmitter, logDir);
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(firstDir)
.build());
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(secondDir)
.build());
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(logDir)
.build());
deleteTmpFiles();
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.junit.AfterClass;
import org.junit.Test;
@ -80,14 +81,28 @@ public class TestDeletionService {
static class FakeDefaultContainerExecutor extends DefaultContainerExecutor {
@Override
public void deleteAsUser(String user, Path subDir, Path... basedirs)
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
String user = ctx.getUser();
Path subDir = ctx.getSubDir();
List<Path> basedirs = ctx.getBasedirs();
if ((Long.parseLong(subDir.getName()) % 2) == 0) {
assertNull(user);
} else {
assertEquals("dingo", user);
}
super.deleteAsUser(user, subDir, basedirs);
DeletionAsUserContext.Builder builder = new DeletionAsUserContext
.Builder()
.setUser(user)
.setSubDir(subDir);
if (basedirs != null) {
builder.setBasedirs(basedirs.toArray(new Path[basedirs.size()]));
}
super.deleteAsUser(builder.build());
assertFalse(lfs.util().exists(subDir));
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -179,9 +180,16 @@ public class TestDockerContainerExecutor {
Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
return exec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
}
// Write the script used to launch the docker container in a temp file

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -149,9 +150,16 @@ public class TestDockerContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
dockerContainerExecutor.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
}
@Test(expected = IllegalArgumentException.class)
@ -185,9 +193,17 @@ public class TestDockerContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
dockerContainerExecutor.launchContainer(
new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
}
@Test
@ -219,9 +235,17 @@ public class TestDockerContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid");
dockerContainerExecutor.activateContainer(cId, pidFile);
int ret = dockerContainerExecutor.launchContainer(container, scriptPath,
tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
int ret = dockerContainerExecutor.launchContainer(
new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
assertEquals(0, ret);
//get the script
Path sessionScriptPath = new Path(workDir,

View File

@ -60,6 +60,11 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.junit.After;
import org.junit.Assert;
@ -208,7 +213,10 @@ public class TestLinuxContainerExecutor {
Path usercachedir = new Path(dir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usercachedir, user);
Path appcachedir = new Path(userdir, ContainerLocalizer.APPCACHE);
exec.deleteAsUser(user, appcachedir);
exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(appcachedir)
.build());
FileContext.getLocalFSFileContext().delete(usercachedir, true);
}
}
@ -218,7 +226,10 @@ public class TestLinuxContainerExecutor {
for (String dir : localDirs) {
Path filecache = new Path(dir, ContainerLocalizer.FILECACHE);
Path filedir = new Path(filecache, user);
exec.deleteAsUser(user, filedir);
exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(filedir)
.build());
}
}
@ -229,7 +240,10 @@ public class TestLinuxContainerExecutor {
String containerId = "CONTAINER_" + (id - 1);
Path appdir = new Path(dir, appId);
Path containerdir = new Path(appdir, containerId);
exec.deleteAsUser(user, containerdir);
exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(containerdir)
.build());
}
}
@ -244,7 +258,11 @@ public class TestLinuxContainerExecutor {
for (String file : files) {
File f = new File(workSpace, file);
if (f.exists()) {
exec.deleteAsUser(user, new Path(file), ws);
exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(new Path(file))
.setBasedirs(ws)
.build());
}
}
}
@ -310,9 +328,16 @@ public class TestLinuxContainerExecutor {
Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
return exec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
}
@Test
@ -345,8 +370,14 @@ public class TestLinuxContainerExecutor {
};
exec.setConf(conf);
exec.startLocalizer(nmPrivateContainerTokensPath, nmAddr, appSubmitter,
appId, locId, dirsHandler);
exec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateContainerTokensPath)
.setNmAddr(nmAddr)
.setUser(appSubmitter)
.setAppId(appId)
.setLocId(locId)
.setDirsHandler(dirsHandler)
.build());
String locId2 = "container_01_02";
Path nmPrivateContainerTokensPath2 =
@ -355,8 +386,16 @@ public class TestLinuxContainerExecutor {
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId2));
files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
exec.startLocalizer(nmPrivateContainerTokensPath2, nmAddr, appSubmitter,
appId, locId2, dirsHandler);
exec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateContainerTokensPath2)
.setNmAddr(nmAddr)
.setUser(appSubmitter)
.setAppId(appId)
.setLocId(locId2)
.setDirsHandler(dirsHandler)
.build());
cleanupUserAppCache(appSubmitter);
}
@ -429,7 +468,11 @@ public class TestLinuxContainerExecutor {
assertNotNull(pid);
LOG.info("Going to killing the process.");
exec.signalContainer(appSubmitter, pid, Signal.TERM);
exec.signalContainer(new ContainerSignalContext.Builder()
.setUser(appSubmitter)
.setPid(pid)
.setSignal(Signal.TERM)
.build());
LOG.info("sleeping for 100ms to let the sleep be killed");
Thread.sleep(100);
@ -586,7 +629,10 @@ public class TestLinuxContainerExecutor {
} catch (IOException e) {
// expected if LCE isn't setup right, but not necessary for this test
}
lce.reacquireContainer("foouser", cid);
lce.reacquireContainer(new ContainerReacquisitionContext.Builder()
.setUser("foouser")
.setContainerId(cid)
.build());
assertTrue("postExec not called after reacquisition",
TestResourceHandler.postExecContainers.contains(cid));
}

View File

@ -49,6 +49,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
@ -130,9 +134,16 @@ public class TestLinuxContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt");
mockExec.activateContainer(cId, pidFile);
int ret = mockExec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
int ret = mockExec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
assertEquals(0, ret);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, appId, containerId,
@ -184,7 +195,15 @@ public class TestLinuxContainerExecutorWithMocks {
Path nmPrivateCTokensPath= new Path("file:///bin/nmPrivateCTokensPath");
try {
mockExec.startLocalizer(nmPrivateCTokensPath, address, "test", "application_0", "12345", dirsHandler);
mockExec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(address)
.setUser("test")
.setAppId("application_0")
.setLocId("12345")
.setDirsHandler(dirsHandler)
.build());
List<String> result=readMockParams();
Assert.assertEquals(result.size(), 18);
Assert.assertEquals(result.get(0), YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
@ -277,9 +296,17 @@ public class TestLinuxContainerExecutorWithMocks {
Path pidFile = new Path(workDir, "pid.txt");
mockExec.activateContainer(cId, pidFile);
int ret = mockExec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
int ret = mockExec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
Assert.assertNotSame(0, ret);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, appId, containerId,
@ -307,7 +334,11 @@ public class TestLinuxContainerExecutorWithMocks {
ContainerExecutor.Signal signal = ContainerExecutor.Signal.QUIT;
String sigVal = String.valueOf(signal.getValue());
mockExec.signalContainer(appSubmitter, "1000", signal);
mockExec.signalContainer(new ContainerSignalContext.Builder()
.setUser(appSubmitter)
.setPid("1000")
.setSignal(signal)
.build());
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "1000", sigVal),
readMockParams());
@ -323,24 +354,41 @@ public class TestLinuxContainerExecutorWithMocks {
Path baseDir0 = new Path("/grid/0/BaseDir");
Path baseDir1 = new Path("/grid/1/BaseDir");
mockExec.deleteAsUser(appSubmitter, dir);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "/tmp/testdir"),
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(dir)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "/tmp/testdir"),
readMockParams());
mockExec.deleteAsUser(appSubmitter, null);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, ""),
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, ""),
readMockParams());
mockExec.deleteAsUser(appSubmitter, testFile, baseDir0, baseDir1);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, testFile.toString(), baseDir0.toString(), baseDir1.toString()),
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(testFile)
.setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, testFile.toString(), baseDir0.toString(),
baseDir1.toString()),
readMockParams());
mockExec.deleteAsUser(appSubmitter, null, baseDir0, baseDir1);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "", baseDir0.toString(), baseDir1.toString()),
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "", baseDir0.toString(), baseDir1.toString()),
readMockParams());
File f = new File("./src/test/resources/mock-container-executer-with-error");
@ -352,22 +400,38 @@ public class TestLinuxContainerExecutorWithMocks {
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
mockExec.setConf(conf);
mockExec.deleteAsUser(appSubmitter, dir);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "/tmp/testdir"),
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(dir)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "/tmp/testdir"),
readMockParams());
mockExec.deleteAsUser(appSubmitter, null);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, ""),
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, ""),
readMockParams());
mockExec.deleteAsUser(appSubmitter, testFile, baseDir0, baseDir1);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, testFile.toString(), baseDir0.toString(), baseDir1.toString()),
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setSubDir(testFile)
.setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(
Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, testFile.toString(), baseDir0.toString(),
baseDir1.toString()),
readMockParams());
mockExec.deleteAsUser(appSubmitter, null, baseDir0, baseDir1);
mockExec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(appSubmitter)
.setBasedirs(baseDir0, baseDir1)
.build());
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "", baseDir0.toString(), baseDir1.toString()),
readMockParams());

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -255,8 +256,11 @@ public abstract class BaseContainerManagerTest {
if (containerManager != null) {
containerManager.stop();
}
createContainerExecutor().deleteAsUser(user,
new Path(localDir.getAbsolutePath()), new Path[] {});
createContainerExecutor().deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(new Path(localDir.getAbsolutePath()))
.setBasedirs(new Path[] {})
.build());
}
public static void waitForContainerState(ContainerManagementProtocol containerManager,

View File

@ -63,6 +63,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert;
import org.apache.commons.io.FileUtils;
@ -939,11 +940,16 @@ public class TestResourceLocalizationService {
dispatcher.await();
String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerId().toString();
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
verify(exec).startLocalizer(tokenPathCaptor.capture(),
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
isA(LocalDirsHandlerService.class));
Path localizationTokenPath = tokenPathCaptor.getValue();
ArgumentCaptor<LocalizerStartContext> contextCaptor = ArgumentCaptor
.forClass(LocalizerStartContext.class);
verify(exec).startLocalizer(contextCaptor.capture());
LocalizerStartContext context = contextCaptor.getValue();
Path localizationTokenPath = context.getNmPrivateContainerTokens();
assertEquals("user0", context.getUser());
assertEquals(appStr, context.getAppId());
assertEquals(ctnrStr, context.getLocId());
// heartbeat from localizer
LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Tes
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@ -165,8 +166,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
@Override
public void tearDown() throws IOException, InterruptedException {
super.tearDown();
createContainerExecutor().deleteAsUser(user,
new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {});
createContainerExecutor().deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(new Path(remoteRootLogDir.getAbsolutePath()))
.setBasedirs(new Path[] {})
.build());
dispatcher.await();
dispatcher.stop();
dispatcher.close();

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@ -288,8 +289,11 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
// Assert that the process is not alive anymore
Assert.assertFalse("Process is still alive!",
exec.signalContainer(user,
pid, Signal.NULL));
exec.signalContainer(new ContainerSignalContext.Builder()
.setUser(user)
.setPid(pid)
.setSignal(Signal.NULL)
.build()));
}
@Test(timeout = 20000)