YARN-4355. NPE while processing localizer heartbeat. Contributed by Varun Saxena & Jonathan Hung.

(cherry picked from commit 7ffb9943b8)
This commit is contained in:
Naganarasimha 2016-11-15 15:41:56 +05:30
parent 7e9a6b653e
commit 260f3a9dc9
2 changed files with 132 additions and 20 deletions

View File

@ -1036,7 +1036,6 @@ public class ResourceLocalizationService extends CompositeService
List<LocalResourceStatus> remoteResourceStatuses) { List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response = LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
String user = context.getUser(); String user = context.getUser();
ApplicationId applicationId = ApplicationId applicationId =
context.getContainerId().getApplicationAttemptId().getApplicationId(); context.getContainerId().getApplicationAttemptId().getApplicationId();
@ -1059,14 +1058,19 @@ public class ResourceLocalizationService extends CompositeService
LOG.error("Unknown resource reported: " + req); LOG.error("Unknown resource reported: " + req);
continue; continue;
} }
LocalResourcesTracker tracker =
getLocalResourcesTracker(req.getVisibility(), user, applicationId);
if (tracker == null) {
// This is likely due to a race between heartbeat and
// app cleaning up.
continue;
}
switch (stat.getStatus()) { switch (stat.getStatus()) {
case FETCH_SUCCESS: case FETCH_SUCCESS:
// notify resource // notify resource
try { try {
getLocalResourcesTracker(req.getVisibility(), user, applicationId) tracker.handle(new ResourceLocalizedEvent(req,
.handle( stat.getLocalPath().toPath(), stat.getLocalSize()));
new ResourceLocalizedEvent(req, stat.getLocalPath().toPath(),
stat.getLocalSize()));
} catch (URISyntaxException e) { } } catch (URISyntaxException e) { }
// unlocking the resource and removing it from scheduled resource // unlocking the resource and removing it from scheduled resource
@ -1080,9 +1084,8 @@ public class ResourceLocalizationService extends CompositeService
final String diagnostics = stat.getException().toString(); final String diagnostics = stat.getException().toString();
LOG.warn(req + " failed: " + diagnostics); LOG.warn(req + " failed: " + diagnostics);
fetchFailed = true; fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId) tracker.handle(new ResourceFailedLocalizationEvent(req,
.handle(new ResourceFailedLocalizationEvent( diagnostics));
req, diagnostics));
// unlocking the resource and removing it from scheduled resource // unlocking the resource and removing it from scheduled resource
// list // list
@ -1092,9 +1095,8 @@ public class ResourceLocalizationService extends CompositeService
default: default:
LOG.info("Unknown status: " + stat.getStatus()); LOG.info("Unknown status: " + stat.getStatus());
fetchFailed = true; fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId) tracker.handle(new ResourceFailedLocalizationEvent(req,
.handle(new ResourceFailedLocalizationEvent( stat.getException().getMessage()));
req, stat.getException().getMessage()));
break; break;
} }
} }
@ -1114,10 +1116,14 @@ public class ResourceLocalizationService extends CompositeService
LocalResource next = findNextResource(); LocalResource next = findNextResource();
if (next != null) { if (next != null) {
try { try {
LocalResourcesTracker tracker = getLocalResourcesTracker(
next.getVisibility(), user, applicationId);
if (tracker != null) {
ResourceLocalizationSpec resource = ResourceLocalizationSpec resource =
NodeManagerBuilderUtils.newResourceLocalizationSpec(next, NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
getPathForLocalization(next)); getPathForLocalization(next, tracker));
rsrcs.add(resource); rsrcs.add(resource);
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("local path for PRIVATE localization could not be " + LOG.error("local path for PRIVATE localization could not be " +
"found. Disks might have failed.", e); "found. Disks might have failed.", e);
@ -1136,14 +1142,12 @@ public class ResourceLocalizationService extends CompositeService
return response; return response;
} }
private Path getPathForLocalization(LocalResource rsrc) throws IOException, private Path getPathForLocalization(LocalResource rsrc,
URISyntaxException { LocalResourcesTracker tracker) throws IOException, URISyntaxException {
String user = context.getUser(); String user = context.getUser();
ApplicationId appId = ApplicationId appId =
context.getContainerId().getApplicationAttemptId().getApplicationId(); context.getContainerId().getApplicationAttemptId().getApplicationId();
LocalResourceVisibility vis = rsrc.getVisibility(); LocalResourceVisibility vis = rsrc.getVisibility();
LocalResourcesTracker tracker =
getLocalResourcesTracker(vis, user, appId);
String cacheDirectory = null; String cacheDirectory = null;
if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
cacheDirectory = getUserFileCachePath(user); cacheDirectory = getUserFileCachePath(user);

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
@ -147,7 +148,6 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -1482,6 +1482,114 @@ public class TestResourceLocalizationService {
} }
} }
@Test(timeout = 20000)
@SuppressWarnings("unchecked")
public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
conf.set(YarnConfiguration.NM_LOCAL_DIRS,
lfs.makeQualified(new Path(basedir, 0 + "")).toString());
// Start dispatcher.
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
dispatcher.register(ApplicationEventType.class, mock(EventHandler.class));
dispatcher.register(ContainerEventType.class, mock(EventHandler.class));
DummyExecutor exec = new DummyExecutor();
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
// Start resource localization service.
ResourceLocalizationService rawService = new ResourceLocalizationService(
dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).
getLocalFileContext(isA(Configuration.class));
try {
spyService.init(conf);
spyService.start();
// Init application resources.
final Application app = mock(Application.class);
final ApplicationId appId = BuilderUtils.newApplicationId(1234567890L, 3);
when(app.getUser()).thenReturn("user0");
when(app.getAppId()).thenReturn(appId);
when(app.toString()).thenReturn(appId.toString());
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
// Initialize localizer.
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
final Container c = getMockContainer(appId, 46, "user0");
FSDataOutputStream out =
new FSDataOutputStream(new DataOutputBuffer(), null);
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
anyBoolean());
final LocalResource resource1 = getAppMockedResource(r);
final LocalResource resource2 = getAppMockedResource(r);
// Send localization requests for container.
// 2 resources generated with APPLICATION visibility.
final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
List<LocalResourceRequest> appResourceList = Arrays.asList(req1, req2);
rsrcs.put(LocalResourceVisibility.APPLICATION, appResourceList);
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
dispatcher.await();
// Wait for localization to begin.
exec.waitForLocalizers(1);
final String containerIdStr = c.getContainerId().toString();
LocalizerRunner locRunnerForContainer =
spyService.getLocalizerRunner(containerIdStr);
// Heartbeats from container localizer
LocalResourceStatus rsrcSuccess = mock(LocalResourceStatus.class);
LocalizerStatus stat = mock(LocalizerStatus.class);
when(stat.getLocalizerId()).thenReturn(containerIdStr);
when(rsrcSuccess.getResource()).thenReturn(resource1);
when(rsrcSuccess.getLocalSize()).thenReturn(4344L);
when(rsrcSuccess.getLocalPath()).thenReturn(getPath("/some/path"));
when(rsrcSuccess.getStatus()).
thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(stat.getResources()).
thenReturn(Collections.<LocalResourceStatus>emptyList());
// First heartbeat which schedules first resource.
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
assertEquals("NM should tell localizer to be LIVE in Heartbeat.",
LocalizerAction.LIVE, response.getLocalizerAction());
// Cleanup application.
spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs));
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app));
dispatcher.await();
try {
// Directly send heartbeat to introduce race as app is being cleaned up.
locRunnerForContainer.processHeartbeat(
Collections.singletonList(rsrcSuccess));
} catch (Exception e) {
fail("Exception should not have been thrown on processing heartbeat");
}
// Send another heartbeat.
response = spyService.heartbeat(stat);
assertEquals("NM should tell localizer to DIE in Heartbeat.",
LocalizerAction.DIE, response.getLocalizerAction());
exec.setStopLocalization();
} finally {
spyService.stop();
dispatcher.stop();
}
}
@Test(timeout=20000) @Test(timeout=20000)
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics
public void testFailedPublicResource() throws Exception { public void testFailedPublicResource() throws Exception {