diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 17aa7d9f62a..29fc747f1aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -74,7 +74,6 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskValidator; import org.apache.hadoop.util.DiskValidatorFactory; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; @@ -809,7 +808,6 @@ public class ResourceLocalizationService extends CompositeService return; // ignore; already gone } privLocalizers.remove(locId); - LOG.info("Interrupting localizer for " + locId); localizer.interrupt(); } } @@ -1190,34 +1188,6 @@ public class ResourceLocalizationService extends CompositeService dirPath, delService); } - @Override - public void interrupt() { - boolean destroyedShell = false; - try { - for (Shell shell : Shell.getAllShells()) { - try { - if (shell.getWaitingThread() != null && - shell.getWaitingThread().equals(this) && - shell.getProcess() != null && - shell.getProcess().isAlive()) { - LOG.info("Destroying localization shell process for " + - localizerId); - shell.getProcess().destroy(); - destroyedShell = true; - break; - } - } catch (Exception e) { - LOG.warn("Failed to destroy localization shell process for " + - localizerId, e); - } - } - } finally { - if (!destroyedShell) { - super.interrupt(); - } - } - } - @Override @SuppressWarnings("unchecked") // dispatcher not typed public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index c180545cd45..d863c6ad4e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -66,7 +66,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.Options; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; @@ -1197,149 +1196,6 @@ public class TestResourceLocalizationService { } } - private static class DummyShellExecutor extends DefaultContainerExecutor { - private AtomicInteger numLocalizers = new AtomicInteger(0); - @Override - public void startLocalizer(LocalizerStartContext ctx) throws IOException, - InterruptedException { - numLocalizers.incrementAndGet(); - Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor( - new String[]{"bash", "-c", "sleep 300"}); - try { - shexec.execute(); - Assert.fail("Shell finished without being interrupted"); - } catch (IOException e) { - System.out.println("Got expected exception executing shell " + - e.toString()); - } - numLocalizers.decrementAndGet(); - } - private void waitForLocalizers(int num) { - while (numLocalizers.intValue() != num) { - Thread.yield(); - } - } - private void waitForShellCount(int num) { - while (Shell.getAllShells().size() != num) { - Thread.yield(); - } - } - } - - @Test(timeout = 60000) - @SuppressWarnings("unchecked") - public void testShellDestroyedOnContainerKill() throws Exception { - List localDirs = new ArrayList(); - String[] sDirs = new String[1]; - localDirs.add(lfs.makeQualified(new Path(basedir, 0 + ""))); - sDirs[0] = localDirs.get(0).toString(); - - conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); - - DummyShellExecutor exec = new DummyShellExecutor(); - LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); - dirsHandler.init(conf); - - DeletionService delServiceReal = new DeletionService(exec); - DeletionService delService = spy(delServiceReal); - delService.init(new Configuration()); - delService.start(); - - DrainDispatcher dispatcher = getDispatcher(conf); - ResourceLocalizationService rawService = new ResourceLocalizationService( - dispatcher, exec, delService, dirsHandler, nmContext, metrics); - - ResourceLocalizationService spyService = spy(rawService); - doReturn(mockServer).when(spyService).createServer(); - doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration - .class)); - FsPermission defaultPermission = - FsPermission.getDirDefault().applyUMask(lfs.getUMask()); - FsPermission nmPermission = - ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); - final Path userDir = - new Path(sDirs[0].substring("file:".length()), - ContainerLocalizer.USERCACHE); - final Path fileDir = - new Path(sDirs[0].substring("file:".length()), - ContainerLocalizer.FILECACHE); - final Path sysDir = - new Path(sDirs[0].substring("file:".length()), - ResourceLocalizationService.NM_PRIVATE_DIR); - final FileStatus fs = - new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, - defaultPermission, "", "", new Path(sDirs[0])); - final FileStatus nmFs = - new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, - nmPermission, "", "", sysDir); - - doAnswer(new Answer() { - @Override - public FileStatus answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - if (args.length > 0) { - if (args[0].equals(userDir) || args[0].equals(fileDir)) { - return fs; - } - } - return nmFs; - } - }).when(spylfs).getFileStatus(isA(Path.class)); - - try { - spyService.init(conf); - spyService.start(); - - final Application app = mock(Application.class); - final ApplicationId appId = - BuilderUtils.newApplicationId(314159265358979L, 3); - String user = "user0"; - when(app.getUser()).thenReturn(user); - when(app.getAppId()).thenReturn(appId); - List resources = initializeLocalizer(appId); - LocalResource resource1 = resources.get(0); - final Container c1 = getMockContainer(appId, 42, "user0"); - - EventHandler applicationBus = - getApplicationBus(dispatcher); - EventHandler containerBus = getContainerBus(dispatcher); - initApp(spyService, applicationBus, app, appId, dispatcher); - - // Send localization request for container c1. - final LocalResourceRequest req1 = new LocalResourceRequest(resource1); - Map> rsrcs = - new HashMap<>(); - List privateResourceList = - new ArrayList<>(); - privateResourceList.add(req1); - rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); - spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs)); - - // Wait for localizer of container c1 to begin. - exec.waitForLocalizers(1); - exec.waitForShellCount(1); - LocalizerRunner localizerRunner = - spyService.getLocalizerRunner(c1.getContainerId().toString()); - - // Container c1 is killed which leads to cleanup - spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs)); - dispatcher.await(); - - // Wait for localizer of container c1 to stop. - exec.waitForShellCount(0); - exec.waitForLocalizers(0); - - // Check that the thread is no longer running - while (localizerRunner.isAlive()) { - Thread.sleep(10); - } - } finally { - spyService.stop(); - dispatcher.stop(); - delService.stop(); - } - } - private DrainDispatcher getDispatcher(Configuration config) { DrainDispatcher dispatcher = new DrainDispatcher(); dispatcher.init(config);