diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4cd1acc5cc8..71971c757de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1036,7 +1036,6 @@ public class ResourceLocalizationService extends CompositeService List remoteResourceStatuses) { LocalizerHeartbeatResponse response = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); - String user = context.getUser(); ApplicationId applicationId = context.getContainerId().getApplicationAttemptId().getApplicationId(); @@ -1059,14 +1058,19 @@ public class ResourceLocalizationService extends CompositeService LOG.error("Unknown resource reported: " + req); continue; } + LocalResourcesTracker tracker = + getLocalResourcesTracker(req.getVisibility(), user, applicationId); + if (tracker == null) { + // This is likely due to a race between heartbeat and + // app cleaning up. + continue; + } switch (stat.getStatus()) { case FETCH_SUCCESS: // notify resource try { - getLocalResourcesTracker(req.getVisibility(), user, applicationId) - .handle( - new ResourceLocalizedEvent(req, stat.getLocalPath().toPath(), - stat.getLocalSize())); + tracker.handle(new ResourceLocalizedEvent(req, + stat.getLocalPath().toPath(), stat.getLocalSize())); } catch (URISyntaxException e) { } // unlocking the resource and removing it from scheduled resource @@ -1080,9 +1084,8 @@ public class ResourceLocalizationService extends CompositeService final String diagnostics = stat.getException().toString(); LOG.warn(req + " failed: " + diagnostics); fetchFailed = true; - getLocalResourcesTracker(req.getVisibility(), user, applicationId) - .handle(new ResourceFailedLocalizationEvent( - req, diagnostics)); + tracker.handle(new ResourceFailedLocalizationEvent(req, + diagnostics)); // unlocking the resource and removing it from scheduled resource // list @@ -1092,9 +1095,8 @@ public class ResourceLocalizationService extends CompositeService default: LOG.info("Unknown status: " + stat.getStatus()); fetchFailed = true; - getLocalResourcesTracker(req.getVisibility(), user, applicationId) - .handle(new ResourceFailedLocalizationEvent( - req, stat.getException().getMessage())); + tracker.handle(new ResourceFailedLocalizationEvent(req, + stat.getException().getMessage())); break; } } @@ -1114,10 +1116,14 @@ public class ResourceLocalizationService extends CompositeService LocalResource next = findNextResource(); if (next != null) { try { - ResourceLocalizationSpec resource = - NodeManagerBuilderUtils.newResourceLocalizationSpec(next, - getPathForLocalization(next)); - rsrcs.add(resource); + LocalResourcesTracker tracker = getLocalResourcesTracker( + next.getVisibility(), user, applicationId); + if (tracker != null) { + ResourceLocalizationSpec resource = + NodeManagerBuilderUtils.newResourceLocalizationSpec(next, + getPathForLocalization(next, tracker)); + rsrcs.add(resource); + } } catch (IOException e) { LOG.error("local path for PRIVATE localization could not be " + "found. Disks might have failed.", e); @@ -1136,14 +1142,12 @@ public class ResourceLocalizationService extends CompositeService return response; } - private Path getPathForLocalization(LocalResource rsrc) throws IOException, - URISyntaxException { + private Path getPathForLocalization(LocalResource rsrc, + LocalResourcesTracker tracker) throws IOException, URISyntaxException { String user = context.getUser(); ApplicationId appId = context.getContainerId().getApplicationAttemptId().getApplicationId(); LocalResourceVisibility vis = rsrc.getVisibility(); - LocalResourcesTracker tracker = - getLocalResourcesTracker(vis, user, appId); String cacheDirectory = null; if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only cacheDirectory = getUserFileCachePath(user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 9404478c738..2692162b31a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; @@ -147,7 +148,6 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -1482,6 +1482,114 @@ public class TestResourceLocalizationService { } } + @Test(timeout = 20000) + @SuppressWarnings("unchecked") + public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception { + conf.set(YarnConfiguration.NM_LOCAL_DIRS, + lfs.makeQualified(new Path(basedir, 0 + "")).toString()); + // Start dispatcher. + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + dispatcher.register(ApplicationEventType.class, mock(EventHandler.class)); + dispatcher.register(ContainerEventType.class, mock(EventHandler.class)); + + DummyExecutor exec = new DummyExecutor(); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + // Start resource localization service. + ResourceLocalizationService rawService = new ResourceLocalizationService( + dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService). + getLocalFileContext(isA(Configuration.class)); + try { + spyService.init(conf); + spyService.start(); + + // Init application resources. + final Application app = mock(Application.class); + final ApplicationId appId = BuilderUtils.newApplicationId(1234567890L, 3); + when(app.getUser()).thenReturn("user0"); + when(app.getAppId()).thenReturn(appId); + when(app.toString()).thenReturn(appId.toString()); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // Initialize localizer. + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + final Container c = getMockContainer(appId, 46, "user0"); + FSDataOutputStream out = + new FSDataOutputStream(new DataOutputBuffer(), null); + doReturn(out).when(spylfs).createInternal(isA(Path.class), + isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), + anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), + anyBoolean()); + final LocalResource resource1 = getAppMockedResource(r); + final LocalResource resource2 = getAppMockedResource(r); + + // Send localization requests for container. + // 2 resources generated with APPLICATION visibility. + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + final LocalResourceRequest req2 = new LocalResourceRequest(resource2); + Map> rsrcs = + new HashMap>(); + List appResourceList = Arrays.asList(req1, req2); + rsrcs.put(LocalResourceVisibility.APPLICATION, appResourceList); + spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); + dispatcher.await(); + + // Wait for localization to begin. + exec.waitForLocalizers(1); + final String containerIdStr = c.getContainerId().toString(); + LocalizerRunner locRunnerForContainer = + spyService.getLocalizerRunner(containerIdStr); + // Heartbeats from container localizer + LocalResourceStatus rsrcSuccess = mock(LocalResourceStatus.class); + LocalizerStatus stat = mock(LocalizerStatus.class); + when(stat.getLocalizerId()).thenReturn(containerIdStr); + when(rsrcSuccess.getResource()).thenReturn(resource1); + when(rsrcSuccess.getLocalSize()).thenReturn(4344L); + when(rsrcSuccess.getLocalPath()).thenReturn(getPath("/some/path")); + when(rsrcSuccess.getStatus()). + thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(stat.getResources()). + thenReturn(Collections.emptyList()); + + // First heartbeat which schedules first resource. + LocalizerHeartbeatResponse response = spyService.heartbeat(stat); + assertEquals("NM should tell localizer to be LIVE in Heartbeat.", + LocalizerAction.LIVE, response.getLocalizerAction()); + + // Cleanup application. + spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs)); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app)); + dispatcher.await(); + try { + // Directly send heartbeat to introduce race as app is being cleaned up. + locRunnerForContainer.processHeartbeat( + Collections.singletonList(rsrcSuccess)); + } catch (Exception e) { + fail("Exception should not have been thrown on processing heartbeat"); + } + // Send another heartbeat. + response = spyService.heartbeat(stat); + assertEquals("NM should tell localizer to DIE in Heartbeat.", + LocalizerAction.DIE, response.getLocalizerAction()); + exec.setStopLocalization(); + } finally { + spyService.stop(); + dispatcher.stop(); + } + } + @Test(timeout=20000) @SuppressWarnings("unchecked") // mocked generics public void testFailedPublicResource() throws Exception {