diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 29fc747f1aa..17aa7d9f62a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -74,6 +74,7 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskValidator; import org.apache.hadoop.util.DiskValidatorFactory; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; @@ -808,6 +809,7 @@ public class ResourceLocalizationService extends CompositeService return; // ignore; already gone } privLocalizers.remove(locId); + LOG.info("Interrupting localizer for " + locId); localizer.interrupt(); } } @@ -1188,6 +1190,34 @@ public class ResourceLocalizationService extends CompositeService dirPath, delService); } + @Override + public void interrupt() { + boolean destroyedShell = false; + try { + for (Shell shell : Shell.getAllShells()) { + try { + if (shell.getWaitingThread() != null && + shell.getWaitingThread().equals(this) && + shell.getProcess() != null && + shell.getProcess().isAlive()) { + LOG.info("Destroying localization shell process for " + + localizerId); + shell.getProcess().destroy(); + destroyedShell = true; + break; + } + } catch (Exception e) { + LOG.warn("Failed to destroy localization shell process for " + + localizerId, e); + } + } + } finally { + if (!destroyedShell) { + super.interrupt(); + } + } + } + @Override @SuppressWarnings("unchecked") // dispatcher not typed public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index d863c6ad4e8..c180545cd45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -66,6 +66,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; @@ -1196,6 +1197,149 @@ public class TestResourceLocalizationService { } } + private static class DummyShellExecutor extends DefaultContainerExecutor { + private AtomicInteger numLocalizers = new AtomicInteger(0); + @Override + public void startLocalizer(LocalizerStartContext ctx) throws IOException, + InterruptedException { + numLocalizers.incrementAndGet(); + Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor( + new String[]{"bash", "-c", "sleep 300"}); + try { + shexec.execute(); + Assert.fail("Shell finished without being interrupted"); + } catch (IOException e) { + System.out.println("Got expected exception executing shell " + + e.toString()); + } + numLocalizers.decrementAndGet(); + } + private void waitForLocalizers(int num) { + while (numLocalizers.intValue() != num) { + Thread.yield(); + } + } + private void waitForShellCount(int num) { + while (Shell.getAllShells().size() != num) { + Thread.yield(); + } + } + } + + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testShellDestroyedOnContainerKill() throws Exception { + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + localDirs.add(lfs.makeQualified(new Path(basedir, 0 + ""))); + sDirs[0] = localDirs.get(0).toString(); + + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + + DummyShellExecutor exec = new DummyShellExecutor(); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + DeletionService delServiceReal = new DeletionService(exec); + DeletionService delService = spy(delServiceReal); + delService.init(new Configuration()); + delService.start(); + + DrainDispatcher dispatcher = getDispatcher(conf); + ResourceLocalizationService rawService = new ResourceLocalizationService( + dispatcher, exec, delService, dirsHandler, nmContext, metrics); + + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration + .class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPermission = + ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + final Path userDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.USERCACHE); + final Path fileDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.FILECACHE); + final Path sysDir = + new Path(sDirs[0].substring("file:".length()), + ResourceLocalizationService.NM_PRIVATE_DIR); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", new Path(sDirs[0])); + final FileStatus nmFs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + nmPermission, "", "", sysDir); + + doAnswer(new Answer() { + @Override + public FileStatus answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + if (args.length > 0) { + if (args[0].equals(userDir) || args[0].equals(fileDir)) { + return fs; + } + } + return nmFs; + } + }).when(spylfs).getFileStatus(isA(Path.class)); + + try { + spyService.init(conf); + spyService.start(); + + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + String user = "user0"; + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + List resources = initializeLocalizer(appId); + LocalResource resource1 = resources.get(0); + final Container c1 = getMockContainer(appId, 42, "user0"); + + EventHandler applicationBus = + getApplicationBus(dispatcher); + EventHandler containerBus = getContainerBus(dispatcher); + initApp(spyService, applicationBus, app, appId, dispatcher); + + // Send localization request for container c1. + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + Map> rsrcs = + new HashMap<>(); + List privateResourceList = + new ArrayList<>(); + privateResourceList.add(req1); + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); + spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs)); + + // Wait for localizer of container c1 to begin. + exec.waitForLocalizers(1); + exec.waitForShellCount(1); + LocalizerRunner localizerRunner = + spyService.getLocalizerRunner(c1.getContainerId().toString()); + + // Container c1 is killed which leads to cleanup + spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs)); + dispatcher.await(); + + // Wait for localizer of container c1 to stop. + exec.waitForShellCount(0); + exec.waitForLocalizers(0); + + // Check that the thread is no longer running + while (localizerRunner.isAlive()) { + Thread.sleep(10); + } + } finally { + spyService.stop(); + dispatcher.stop(); + delService.stop(); + } + } + private DrainDispatcher getDispatcher(Configuration config) { DrainDispatcher dispatcher = new DrainDispatcher(); dispatcher.init(config);