diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 0494c2d96bb..3e4af2c7036 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -141,6 +141,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils; import org.apache.hadoop.yarn.util.FSDownload; +import org.apache.hadoop.yarn.util.LRUCacheHashMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; @@ -722,6 +723,8 @@ class LocalizerTracker extends AbstractService implements EventHandler privLocalizers; + private final Map recentlyCleanedLocalizers; + private final int maxRecentlyCleaned = 128; LocalizerTracker(Configuration conf) { this(conf, new HashMap()); @@ -732,6 +735,8 @@ class LocalizerTracker extends AbstractService implements EventHandler(maxRecentlyCleaned, false); } @Override @@ -783,14 +788,24 @@ public void handle(LocalizerEvent event) { synchronized (privLocalizers) { LocalizerRunner localizer = privLocalizers.get(locId); if (localizer != null && localizer.killContainerLocalizer.get()) { - // Old localizer thread has been stopped, remove it and creates + // Old localizer thread has been stopped, remove it and create // a new localizer thread. LOG.info("New " + event.getType() + " localize request for " + locId + ", remove old private localizer."); - cleanupPrivLocalizers(locId); + privLocalizers.remove(locId); + localizer.interrupt(); localizer = null; } if (null == localizer) { + // Don't create a new localizer if this one has been recently + // cleaned up - this can happen if localization requests come + // in after cleanupPrivLocalizers has been called. + if (recentlyCleanedLocalizers.containsKey(locId)) { + LOG.info( + "Skipping localization request for recently cleaned " + + "localizer " + locId + " resource:" + req.getResource()); + break; + } LOG.info("Created localizer for " + locId); localizer = new LocalizerRunner(req.getContext(), locId); privLocalizers.put(locId, localizer); @@ -808,6 +823,7 @@ public void handle(LocalizerEvent event) { public void cleanupPrivLocalizers(String locId) { synchronized (privLocalizers) { LocalizerRunner localizer = privLocalizers.get(locId); + recentlyCleanedLocalizers.put(locId, locId); if (null == localizer) { return; // ignore; already gone } @@ -1047,44 +1063,74 @@ public void endContainerLocalization() { * * @return the next resource to be localized */ - private LocalResource findNextResource() { + private ResourceLocalizationSpec findNextResource( + String user, ApplicationId applicationId) { synchronized (pending) { for (Iterator i = pending.iterator(); i.hasNext();) { - LocalizerResourceRequestEvent evt = i.next(); - LocalizedResource nRsrc = evt.getResource(); - // Resource download should take place ONLY if resource is in - // Downloading state - if (nRsrc.getState() != ResourceState.DOWNLOADING) { - i.remove(); - continue; - } - /* - * Multiple containers will try to download the same resource. So the - * resource download should start only if - * 1) We can acquire a non blocking semaphore lock on resource - * 2) Resource is still in DOWNLOADING state - */ - if (nRsrc.tryAcquire()) { - if (nRsrc.getState() == ResourceState.DOWNLOADING) { - LocalResourceRequest nextRsrc = nRsrc.getRequest(); - LocalResource next = - recordFactory.newRecordInstance(LocalResource.class); - next.setResource(URL.fromPath(nextRsrc - .getPath())); - next.setTimestamp(nextRsrc.getTimestamp()); - next.setType(nextRsrc.getType()); - next.setVisibility(evt.getVisibility()); - next.setPattern(evt.getPattern()); - scheduled.put(nextRsrc, evt); - return next; - } else { - // Need to release acquired lock - nRsrc.unlock(); - } - } - } - return null; + LocalizerResourceRequestEvent evt = i.next(); + LocalizedResource nRsrc = evt.getResource(); + // Resource download should take place ONLY if resource is in + // Downloading state + if (nRsrc.getState() != ResourceState.DOWNLOADING) { + i.remove(); + continue; + } + /* + * Multiple containers will try to download the same resource. So the + * resource download should start only if + * 1) We can acquire a non blocking semaphore lock on resource + * 2) Resource is still in DOWNLOADING state + */ + if (nRsrc.tryAcquire()) { + if (nRsrc.getState() == ResourceState.DOWNLOADING) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource(URL.fromPath(nextRsrc.getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + next.setPattern(evt.getPattern()); + ResourceLocalizationSpec nextSpec = null; + try { + LocalResourcesTracker tracker = getLocalResourcesTracker( + next.getVisibility(), user, applicationId); + if (tracker != null) { + Path localPath = getPathForLocalization(next, tracker); + if (localPath != null) { + nextSpec = NodeManagerBuilderUtils. + newResourceLocalizationSpec(next, localPath); + } + } + } catch (IOException e) { + LOG.error("local path for PRIVATE localization could not be " + + "found. Disks might have failed.", e); + } catch (IllegalArgumentException e) { + LOG.error("Incorrect path for PRIVATE localization." + + next.getResource().getFile(), e); + } catch (URISyntaxException e) { + LOG.error( + "Got exception in parsing URL of LocalResource:" + + next.getResource(), e); + } + if (nextSpec != null) { + scheduled.put(nextRsrc, evt); + return nextSpec; + } else { + // We failed to get a path for this, don't try to localize this + // resource again. + nRsrc.unlock(); + i.remove(); + continue; + } + } else { + // Need to release acquired lock + nRsrc.unlock(); + } + } + } + return null; } } @@ -1170,29 +1216,9 @@ LocalizerHeartbeatResponse processHeartbeat( * TODO : It doesn't support multiple downloads per ContainerLocalizer * at the same time. We need to think whether we should support this. */ - LocalResource next = findNextResource(); + ResourceLocalizationSpec next = findNextResource(user, applicationId); if (next != null) { - try { - LocalResourcesTracker tracker = getLocalResourcesTracker( - next.getVisibility(), user, applicationId); - if (tracker != null) { - Path localPath = getPathForLocalization(next, tracker); - if (localPath != null) { - rsrcs.add(NodeManagerBuilderUtils.newResourceLocalizationSpec( - next, localPath)); - } - } - } catch (IOException e) { - LOG.error("local path for PRIVATE localization could not be " + - "found. Disks might have failed.", e); - } catch (IllegalArgumentException e) { - LOG.error("Incorrect path for PRIVATE localization." - + next.getResource().getFile(), e); - } catch (URISyntaxException e) { - LOG.error( - "Got exception in parsing URL of LocalResource:" - + next.getResource(), e); - } + rsrcs.add(next); } response.setLocalizerAction(LocalizerAction.LIVE); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 8871bf6d1e4..cb877c4e243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -140,6 +140,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -1108,9 +1110,21 @@ private void waitForLocalizers(int num) { Thread.yield(); } } + private void yieldForLocalizers(int num) { + for (int i = 0; i < num; i++) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + continue; + } + } + } private void setStopLocalization() { stopLocalization = true; } + private int getNumLocalizers() { + return numLocalizers.get(); + } } @Test(timeout = 20000) @@ -1137,7 +1151,8 @@ public void testDownloadingResourcesOnContainerKill() throws Exception { ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); - doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); + doReturn(lfs).when(spyService). + getLocalFileContext(isA(Configuration.class)); FsPermission defaultPermission = FsPermission.getDirDefault().applyUMask(lfs.getUMask()); FsPermission nmPermission = @@ -1184,6 +1199,78 @@ public FileStatus answer(InvocationOnMock invocation) throws Throwable { } } + @Test + public void testResourceLocalizationReqsAfterContainerKill() + throws Exception { + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + localDirs.add(lfs.makeQualified(new Path(basedir, 0 + ""))); + sDirs[0] = localDirs.get(0).toString(); + + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + + DummyExecutor exec = new DummyExecutor(); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + DeletionService delServiceReal = new DeletionService(exec); + DeletionService delService = spy(delServiceReal); + delService.init(new Configuration()); + delService.start(); + + DrainDispatcher dispatcher = getDispatcher(conf); + ResourceLocalizationService rawService = new ResourceLocalizationService( + dispatcher, exec, delService, dirsHandler, nmContext, metrics); + + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPermission = + ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + final Path userDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.USERCACHE); + final Path fileDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.FILECACHE); + final Path sysDir = + new Path(sDirs[0].substring("file:".length()), + ResourceLocalizationService.NM_PRIVATE_DIR); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", new Path(sDirs[0])); + final FileStatus nmFs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + nmPermission, "", "", sysDir); + + doAnswer(new Answer() { + @Override + public FileStatus answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + if (args.length > 0) { + if (args[0].equals(userDir) || args[0].equals(fileDir)) { + return fs; + } + } + return nmFs; + } + }).when(spylfs).getFileStatus(isA(Path.class)); + + try { + spyService.init(conf); + spyService.start(); + + doLocalizationAfterCleanup(spyService, dispatcher, exec, delService); + + } finally { + spyService.stop(); + dispatcher.stop(); + delService.stop(); + } + } + private DrainDispatcher getDispatcher(Configuration config) { DrainDispatcher dispatcher = new DrainDispatcher(); dispatcher.init(config); @@ -1342,6 +1429,149 @@ private void doLocalization(ResourceLocalizationService spyService, assertNull(rsrc3); } + private void doLocalizationAfterCleanup( + ResourceLocalizationService spyService, + DrainDispatcher dispatcher, DummyExecutor exec, + DeletionService delService) + throws IOException, URISyntaxException, InterruptedException { + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + String user = "user0"; + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + List resources = initializeLocalizer(appId); + LocalResource resource1 = resources.get(0); + LocalResource resource2 = resources.get(1); + LocalResource resource3 = resources.get(2); + final Container c1 = getMockContainer(appId, 42, "user0"); + final Container c2 = getMockContainer(appId, 43, "user0"); + + EventHandler applicationBus = + getApplicationBus(dispatcher); + EventHandler containerBus = getContainerBus(dispatcher); + initApp(spyService, applicationBus, app, appId, dispatcher); + + // Send localization requests for container c1 and c2. + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + final LocalResourceRequest req2 = new LocalResourceRequest(resource2); + final LocalResourceRequest req3 = new LocalResourceRequest(resource3); + Map> rsrcs = + new HashMap>(); + List privateResourceList = + new ArrayList(); + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); + + // Start Localization without any resources (so we can simulate the + // resource requests being delayed until after cleanup. + spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs)); + dispatcher.await(); + + // Kill c1 which leads to cleanup + spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs)); + dispatcher.await(); + + // Now we will send the resource requests and releases directly to tracker + privateResourceList.add(req1); + privateResourceList.add(req2); + privateResourceList.add(req3); + + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); + LocalizerContext locCtx = + new LocalizerContext(user, c1.getContainerId(), c1.getCredentials()); + LocalResourcesTracker tracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, + user, null); + for (LocalResourceRequest req : privateResourceList) { + tracker.handle( + new ResourceRequestEvent(req, LocalResourceVisibility.PRIVATE, + locCtx)); + } + dispatcher.await(); + for (LocalResourceRequest req : privateResourceList) { + tracker.handle( + new ResourceReleaseEvent(req, c1.getContainerId())); + } + dispatcher.await(); + + // Now start a second container with the same list of resources + spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs)); + dispatcher.await(); + + // Wait for localizers to begin (should only be one for container2) + exec.yieldForLocalizers(2); + assertThat(exec.getNumLocalizers()).isEqualTo(1); + + LocalizerRunner locC2 = + spyService.getLocalizerRunner(c2.getContainerId().toString()); + LocalizerStatus stat = mockLocalizerStatus(c2, resource1, resource2); + + // First heartbeat which schedules first resource. + LocalizerHeartbeatResponse response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + + // Second heartbeat which reports first resource as success. + // Second resource is scheduled. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + final String locPath1 = + response.getResourceSpecs().get(0).getDestinationDirectory().getFile(); + + // Third heartbeat which reports second resource as pending. + // Third resource is scheduled. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + final String locPath2 = + response.getResourceSpecs().get(0).getDestinationDirectory().getFile(); + + // Container c2 is killed which leads to cleanup + spyService.handle(new ContainerLocalizationCleanupEvent(c2, rsrcs)); + + // This heartbeat will indicate to container localizer to die as localizer + // runner has stopped. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); + + exec.setStopLocalization(); + dispatcher.await(); + + // verify container notification + ArgumentMatcher successContainerLoc = + evt -> evt.getType() == ContainerEventType.RESOURCE_LOCALIZED + && c2.getContainerId() == evt.getContainerID(); + // Only one resource gets localized for container c2. + verify(containerBus).handle(argThat(successContainerLoc)); + + Set paths = + Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"), + new Path(locPath2), new Path(locPath2 + "_tmp")); + // Wait for localizer runner thread for container c1 to finish. + while (locC2.getState() != Thread.State.TERMINATED) { + Thread.sleep(50); + } + // Verify if downloading resources were submitted for deletion. + verify(delService, times(3)).delete(argThat(new FileDeletionMatcher( + delService, user, null, new ArrayList<>(paths)))); + + // Container c2 was killed but this resource was localized before kill + // hence its not removed despite ref cnt being 0. + LocalizedResource rsrc1 = tracker.getLocalizedResource(req1); + assertNotNull(rsrc1); + assertThat(rsrc1.getState()).isEqualTo(ResourceState.LOCALIZED); + assertThat(rsrc1.getRefCount()).isEqualTo(0); + + // Container c1 and c2 were killed before this finished downloading + // these should no longer be there. + LocalizedResource rsrc2 = tracker.getLocalizedResource(req2); + assertNull(rsrc2); + LocalizedResource rsrc3 = tracker.getLocalizedResource(req3); + assertNull(rsrc3); + + // Double-check that we never created a Localizer for C1 + assertThat(exec.getNumLocalizers()).isEqualTo(1); + } + private LocalizerStatus mockLocalizerStatus(Container c1, LocalResource resource1, LocalResource resource2) { final String containerIdStr = c1.getContainerId().toString();