diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a83b3945eb8..7fc4f1c4e16 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1506,6 +1506,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3141. Fix the broken MRAppMaster to work over YARN in security mode.(vinodkv) + MAPREDUCE-2751. Modified NodeManager to stop leaving around local files + after application finishes. (Siddharth Seth via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 83872876797..40ef62a18c1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -199,13 +199,18 @@ public class DefaultContainerExecutor extends ContainerExecutor { throws IOException, InterruptedException { if (baseDirs == null || baseDirs.length == 0) { LOG.info("Deleting absolute path : " + subDir); - lfs.delete(subDir, true); + if (!lfs.delete(subDir, true)) { + //Maybe retry + LOG.warn("delete returned false for path: [" + subDir + "]"); + } return; } for (Path baseDir : baseDirs) { Path del = subDir == null ? baseDir : new Path(baseDir, subDir); LOG.info("Deleting path : " + del); - lfs.delete(del, true); + if (!lfs.delete(del, true)) { + LOG.warn("delete returned false for path: [" + del + "]"); + } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 346e79e7a79..0a9c07705de 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.*; @@ -125,6 +124,7 @@ public class DeletionService extends AbstractService { } } else { try { + LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]"); exec.deleteAsUser(user, subDir, baseDirs); } catch (IOException e) { LOG.warn("Failed to delete as user " + user, e); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3c92c0b53c5..23ee6482127 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -286,6 +286,8 @@ public class ContainerManagerImpl extends CompositeService implements StartContainerResponse response = recordFactory.newRecordInstance(StartContainerResponse.class); response.addAllServiceResponse(auxiluaryServices.getMeta()); + // TODO launchedContainer misplaced -> doesn't necessarily mean a container + // launch. A finished Application will not launch containers. metrics.launchedContainer(); metrics.allocateContainer(launchContext.getResource()); return response; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index f7116958afd..eebd1f152ec 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -152,6 +152,7 @@ public class ApplicationImpl implements Application { /** * Notify services of new application. */ + @SuppressWarnings("unchecked") static class AppInitTransition implements SingleArcTransition { @Override @@ -180,6 +181,7 @@ public class ApplicationImpl implements Application { } } + @SuppressWarnings("unchecked") static class AppInitDoneTransition implements SingleArcTransition { @Override @@ -199,6 +201,7 @@ public class ApplicationImpl implements Application { } } + @SuppressWarnings("unchecked") static class DuplicateAppInitTransition implements SingleArcTransition { @Override @@ -229,6 +232,7 @@ public class ApplicationImpl implements Application { } } + @SuppressWarnings("unchecked") void handleAppFinishWithContainersCleanedup() { // Delete Application level resources this.dispatcher.getEventHandler().handle( @@ -238,6 +242,7 @@ public class ApplicationImpl implements Application { // TODO: Trigger the LogsManager } + @SuppressWarnings("unchecked") static class AppFinishTriggeredTransition implements MultipleArcTransition { @@ -286,6 +291,7 @@ public class ApplicationImpl implements Application { } + @SuppressWarnings("unchecked") static class AppCompletelyDoneTransition implements SingleArcTransition { @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 43afa4cb85e..117ede79a13 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -27,7 +27,6 @@ import java.io.OutputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.EnumSet; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -124,19 +123,18 @@ public class ContainerLaunch implements Callable { FileContext lfs = FileContext.getLocalFSFileContext(); LocalDirAllocator lDirAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO + Path nmPrivateContainerScriptPath = lDirAllocator.getLocalPathForWrite( - ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR - + appIdStr + Path.SEPARATOR + containerIdStr - + Path.SEPARATOR + CONTAINER_SCRIPT, this.conf); + getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + + CONTAINER_SCRIPT, this.conf); Path nmPrivateTokensPath = lDirAllocator.getLocalPathForWrite( - ResourceLocalizationService.NM_PRIVATE_DIR - + Path.SEPARATOR - + containerIdStr + getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, containerIdStr), this.conf); + DataOutputStream containerScriptOutStream = null; DataOutputStream tokensOutStream = null; @@ -229,6 +227,16 @@ public class ContainerLaunch implements Callable { return 0; } + private String getContainerPrivateDir(String appIdStr, String containerIdStr) { + return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr + + Path.SEPARATOR; + } + + private String getAppPrivateDir(String appIdStr) { + return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR + + appIdStr; + } + private static class ShellScriptBuilder { private final StringBuilder sb; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 017431501f8..d758e885a89 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -37,6 +40,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even class LocalResourcesTrackerImpl implements LocalResourcesTracker { static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class); + private static final String RANDOM_DIR_REGEX = "-?\\d+"; + private static final Pattern RANDOM_DIR_PATTERN = Pattern + .compile(RANDOM_DIR_REGEX); private final String user; private final Dispatcher dispatcher; @@ -83,28 +89,44 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { @Override public boolean remove(LocalizedResource rem, DeletionService delService) { - // current synchronization guaranteed by crude RLS event for cleanup + // current synchronization guaranteed by crude RLS event for cleanup LocalizedResource rsrc = localrsrc.get(rem.getRequest()); if (null == rsrc) { - LOG.error("Attempt to remove absent resource: " + rem.getRequest() + - " from " + getUser()); + LOG.error("Attempt to remove absent resource: " + rem.getRequest() + + " from " + getUser()); return true; } if (rsrc.getRefCount() > 0 - || ResourceState.DOWNLOADING.equals(rsrc.getState()) - || rsrc != rem) { + || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) { // internal error - LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount"); + LOG.error("Attempt to remove resource: " + rsrc + + " with non-zero refcount"); assert false; return false; } - localrsrc.remove(rem.getRequest()); if (ResourceState.LOCALIZED.equals(rsrc.getState())) { - delService.delete(getUser(), rsrc.getLocalPath()); + delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); } return true; } + + /** + * Returns the path upto the random directory component. + */ + private Path getPathToDelete(Path localPath) { + Path delPath = localPath.getParent(); + String name = delPath.getName(); + Matcher matcher = RANDOM_DIR_PATTERN.matcher(name); + if (matcher.matches()) { + return delPath; + } else { + LOG.warn("Random directroy component did not match. " + + "Deleting localized path only"); + return localPath; + } + } + @Override public String getUser() { return user; @@ -114,5 +136,4 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { public Iterator iterator() { return localrsrc.values().iterator(); } - } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 89d523436cc..c6b68215a8e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -304,6 +304,7 @@ public class ResourceLocalizationService extends CompositeService retain.addResources(t); LOG.debug("Resource cleanup " + t.getUser() + ":" + retain); } + //TODO Check if appRsrcs should also be added to the retention set. break; case CLEANUP_CONTAINER_RESOURCES: ContainerLocalizationCleanupEvent rsrcCleanup = @@ -336,6 +337,7 @@ public class ResourceLocalizationService extends CompositeService delService.delete(userName, containerDir, new Path[] {}); // Delete the nmPrivate container-dir + Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); Path containerSysDir = new Path(appSysDir, containerIDStr); @@ -762,14 +764,16 @@ public class ResourceLocalizationService extends CompositeService @Override @SuppressWarnings("unchecked") // dispatcher not typed public void run() { + Path nmPrivateCTokensPath = null; try { // Use LocalDirAllocator to get nmPrivateDir - Path nmPrivateCTokensPath = + nmPrivateCTokensPath = localDirsSelector.getLocalPathForWrite( NM_PRIVATE_DIR + Path.SEPARATOR + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, localizerId), getConfig()); + // 0) init queue, etc. // 1) write credentials to private dir DataOutputStream tokenOut = null; @@ -811,6 +815,7 @@ public class ResourceLocalizationService extends CompositeService for (LocalizerResourceRequestEvent event : scheduled.values()) { event.getResource().unlock(); } + delService.delete(null, nmPrivateCTokensPath, new Path[] {}); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java new file mode 100644 index 00000000000..33bf85c64cf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -0,0 +1,418 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +public class TestApplication { + + /** + * All container start events before application running. + */ + @Test + public void testApplicationInit1() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(1, 314159265358979L, "yak", 3); + wa.initApplication(1); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + assertEquals(1, wa.app.getContainers().size()); + wa.initApplication(0); + wa.initApplication(2); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + assertEquals(3, wa.app.getContainers().size()); + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + + for (int i = 0; i < wa.containers.size(); i++) { + verify(wa.containerBus).handle( + argThat(new ContainerInitMatcher(wa.containers.get(i) + .getContainerID()))); + } + } finally { + if (wa != null) + wa.finished(); + } + } + + /** + * Container start events after Application Running + */ + @Test + public void testApplicationInit2() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(2, 314159265358979L, "yak", 3); + wa.initApplication(0); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + assertEquals(1, wa.app.getContainers().size()); + + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + verify(wa.containerBus).handle( + argThat(new ContainerInitMatcher(wa.containers.get(0) + .getContainerID()))); + + wa.initApplication(1); + wa.initApplication(2); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(3, wa.app.getContainers().size()); + + for (int i = 1; i < wa.containers.size(); i++) { + verify(wa.containerBus).handle( + argThat(new ContainerInitMatcher(wa.containers.get(i) + .getContainerID()))); + } + } finally { + if (wa != null) + wa.finished(); + } + } + + /** + * App state RUNNING after all containers complete, before RM sends + * APP_FINISHED + */ + @Test + public void testAppRunningAfterContainersComplete() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(3, 314159265358979L, "yak", 3); + wa.initApplication(-1); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + + wa.containerFinished(0); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(2, wa.app.getContainers().size()); + + wa.containerFinished(1); + wa.containerFinished(2); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(0, wa.app.getContainers().size()); + } finally { + if (wa != null) + wa.finished(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testAppFinishedOnRunningContainers() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(4, 314159265358979L, "yak", 3); + wa.initApplication(-1); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + + wa.containerFinished(0); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(2, wa.app.getContainers().size()); + + wa.appFinished(); + assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, + wa.app.getApplicationState()); + assertEquals(2, wa.app.getContainers().size()); + + for (int i = 1; i < wa.containers.size(); i++) { + verify(wa.containerBus).handle( + argThat(new ContainerKillMatcher(wa.containers.get(i) + .getContainerID()))); + } + + wa.containerFinished(1); + assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, + wa.app.getApplicationState()); + assertEquals(1, wa.app.getContainers().size()); + + reset(wa.localizerBus); + wa.containerFinished(2); + // All containers finished. Cleanup should be called. + assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, + wa.app.getApplicationState()); + assertEquals(0, wa.app.getContainers().size()); + + verify(wa.localizerBus).handle( + refEq(new ApplicationLocalizationEvent( + LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); + + wa.appResourcesCleanedup(); + assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); + + } finally { + if (wa != null) + wa.finished(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testAppFinishedOnCompletedContainers() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(5, 314159265358979L, "yak", 3); + wa.initApplication(-1); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + + reset(wa.localizerBus); + wa.containerFinished(0); + wa.containerFinished(1); + wa.containerFinished(2); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(0, wa.app.getContainers().size()); + + wa.appFinished(); + assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, + wa.app.getApplicationState()); + + verify(wa.localizerBus).handle( + refEq(new ApplicationLocalizationEvent( + LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); + + wa.appResourcesCleanedup(); + assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); + } finally { + if (wa != null) + wa.finished(); + } + } + +//TODO Re-work after Application transitions are changed. +// @Test + @SuppressWarnings("unchecked") + public void testStartContainerAfterAppFinished() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(5, 314159265358979L, "yak", 3); + wa.initApplication(-1); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + + reset(wa.localizerBus); + wa.containerFinished(0); + wa.containerFinished(1); + wa.containerFinished(2); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(0, wa.app.getContainers().size()); + + wa.appFinished(); + assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, + wa.app.getApplicationState()); + verify(wa.localizerBus).handle( + refEq(new ApplicationLocalizationEvent( + LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); + + wa.appResourcesCleanedup(); + assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); + } finally { + if (wa != null) + wa.finished(); + } + } + +//TODO Re-work after Application transitions are changed. +// @Test + @SuppressWarnings("unchecked") + public void testAppFinishedOnIniting() { + // AM may send a startContainer() - AM APP_FINIHSED processed after + // APP_FINISHED on another NM + WrappedApplication wa = null; + try { + wa = new WrappedApplication(1, 314159265358979L, "yak", 3); + wa.initApplication(0); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + assertEquals(1, wa.app.getContainers().size()); + + reset(wa.localizerBus); + wa.appFinished(); + + verify(wa.containerBus).handle( + argThat(new ContainerKillMatcher(wa.containers.get(0) + .getContainerID()))); + assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, + wa.app.getApplicationState()); + + wa.containerFinished(0); + assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, + wa.app.getApplicationState()); + verify(wa.localizerBus).handle( + refEq(new ApplicationLocalizationEvent( + LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); + + wa.initApplication(1); + assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, + wa.app.getApplicationState()); + assertEquals(0, wa.app.getContainers().size()); + + wa.appResourcesCleanedup(); + assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); + } finally { + if (wa != null) + wa.finished(); + } + } + + private class ContainerKillMatcher extends ArgumentMatcher { + private ContainerId cId; + + public ContainerKillMatcher(ContainerId cId) { + this.cId = cId; + } + + @Override + public boolean matches(Object argument) { + if (argument instanceof ContainerKillEvent) { + ContainerKillEvent event = (ContainerKillEvent) argument; + return event.getContainerID().equals(cId); + } + return false; + } + } + + private class ContainerInitMatcher extends ArgumentMatcher { + private ContainerId cId; + + public ContainerInitMatcher(ContainerId cId) { + this.cId = cId; + } + + @Override + public boolean matches(Object argument) { + if (argument instanceof ContainerInitEvent) { + ContainerInitEvent event = (ContainerInitEvent) argument; + return event.getContainerID().equals(cId); + } + return false; + } + } + + @SuppressWarnings("unchecked") + private class WrappedApplication { + final DrainDispatcher dispatcher; + final EventHandler localizerBus; + final EventHandler launcherBus; + final EventHandler monitorBus; + final EventHandler auxBus; + final EventHandler containerBus; + final EventHandler logAggregationBus; + final String user; + final List containers; + + final ApplicationId appId; + final Application app; + + WrappedApplication(int id, long timestamp, String user, int numContainers) { + dispatcher = new DrainDispatcher(); + dispatcher.init(null); + + localizerBus = mock(EventHandler.class); + launcherBus = mock(EventHandler.class); + monitorBus = mock(EventHandler.class); + auxBus = mock(EventHandler.class); + containerBus = mock(EventHandler.class); + logAggregationBus = mock(EventHandler.class); + + dispatcher.register(LocalizationEventType.class, localizerBus); + dispatcher.register(ContainersLauncherEventType.class, launcherBus); + dispatcher.register(ContainersMonitorEventType.class, monitorBus); + dispatcher.register(AuxServicesEventType.class, auxBus); + dispatcher.register(ContainerEventType.class, containerBus); + dispatcher.register(LogAggregatorEventType.class, logAggregationBus); + + this.user = user; + this.appId = BuilderUtils.newApplicationId(timestamp, id); + + app = new ApplicationImpl(dispatcher, this.user, appId, null); + containers = new ArrayList(); + for (int i = 0; i < numContainers; i++) { + containers.add(createMockedContainer(this.appId, i)); + } + + dispatcher.start(); + } + + private void drainDispatcherEvents() { + dispatcher.await(); + } + + public void finished() { + dispatcher.stop(); + } + + public void initApplication(int containerNum) { + if (containerNum == -1) { + for (int i = 0; i < containers.size(); i++) { + app.handle(new ApplicationInitEvent(containers.get(i))); + } + } else { + app.handle(new ApplicationInitEvent(containers.get(containerNum))); + } + drainDispatcherEvents(); + } + + public void containerFinished(int containerNum) { + app.handle(new ApplicationContainerFinishedEvent(containers.get( + containerNum).getContainerID())); + drainDispatcherEvents(); + } + + public void applicationInited() { + app.handle(new ApplicationInitedEvent(appId)); + drainDispatcherEvents(); + } + + public void appFinished() { + app.handle(new ApplicationEvent(appId, + ApplicationEventType.FINISH_APPLICATION)); + drainDispatcherEvents(); + } + + public void appResourcesCleanedup() { + app.handle(new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); + drainDispatcherEvents(); + } + } + + private Container createMockedContainer(ApplicationId appId, int containerId) { + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId); + Container c = mock(Container.class); + when(c.getContainerID()).thenReturn(cId); + return c; + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index b0174ad06ec..c4169e56b84 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; import static org.junit.Assert.*; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import static org.mockito.Mockito.*; @@ -355,7 +356,8 @@ public class TestResourceLocalizationService { dispatcher.register(ContainerEventType.class, containerBus); ContainerExecutor exec = mock(ContainerExecutor.class); - DeletionService delService = new DeletionService(exec); + DeletionService delServiceReal = new DeletionService(exec); + DeletionService delService = spy(delServiceReal); delService.init(null); delService.start(); @@ -407,12 +409,14 @@ public class TestResourceLocalizationService { rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req)); spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); // Sigh. Thread init of private localizer not accessible - Thread.sleep(500); + Thread.sleep(1000); dispatcher.await(); String appStr = ConverterUtils.toString(appId); String ctnrStr = c.getContainerID().toString(); - verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class), - eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class)); + ArgumentCaptor tokenPathCaptor = ArgumentCaptor.forClass(Path.class); + verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class), + eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class)); + Path localizationTokenPath = tokenPathCaptor.getValue(); // heartbeat from localizer LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class); @@ -454,10 +458,13 @@ public class TestResourceLocalizationService { }; dispatcher.await(); verify(containerBus).handle(argThat(matchesContainerLoc)); + + // Verify deletion of localization token. + verify(delService).delete((String)isNull(), eq(localizationTokenPath)); } finally { - delService.stop(); - dispatcher.stop(); spyService.stop(); + dispatcher.stop(); + delService.stop(); } }