YARN-4354. Public resource localization fails with NPE. Contributed by Jason Lowe.

This commit is contained in:
Junping Du 2015-11-15 04:43:57 -08:00
parent c753617a48
commit 855d52927b
4 changed files with 67 additions and 6 deletions

View File

@ -1181,6 +1181,9 @@ Release 2.7.2 - UNRELEASED
YARN-4320. TestJobHistoryEventHandler fails as AHS in MiniYarnCluster no longer YARN-4320. TestJobHistoryEventHandler fails as AHS in MiniYarnCluster no longer
binds to default port 8188. (Varun Saxena via ozawa) binds to default port 8188. (Varun Saxena via ozawa)
YARN-4354. Public resource localization fails with NPE. (Jason Lowe via
junping_du)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -185,14 +185,22 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
break; break;
} }
if (rsrc == null) {
LOG.warn("Received " + event.getType() + " event for request " + req
+ " but localized resource is missing");
return;
}
rsrc.handle(event); rsrc.handle(event);
// Remove the resource if its downloading and its reference count has // Remove the resource if its downloading and its reference count has
// become 0 after RELEASE. This maybe because a container was killed while // become 0 after RELEASE. This maybe because a container was killed while
// localizing and no other container is referring to the resource. // localizing and no other container is referring to the resource.
// NOTE: This should NOT be done for public resources since the
// download is not associated with a container-specific localizer.
if (event.getType() == ResourceEventType.RELEASE) { if (event.getType() == ResourceEventType.RELEASE) {
if (rsrc.getState() == ResourceState.DOWNLOADING && if (rsrc.getState() == ResourceState.DOWNLOADING &&
rsrc.getRefCount() <= 0) { rsrc.getRefCount() <= 0 &&
rsrc.getRequest().getVisibility() != LocalResourceVisibility.PUBLIC) {
removeResource(req); removeResource(req);
} }
} }

View File

@ -141,12 +141,12 @@ public class TestLocalResourcesTrackerImpl {
tracker.handle(rel21Event); tracker.handle(rel21Event);
dispatcher.await(); dispatcher.await();
verifyTrackedResourceCount(tracker, 1); verifyTrackedResourceCount(tracker, 2);
// Verify resource with non zero ref count is not removed. // Verify resource with non zero ref count is not removed.
Assert.assertEquals(2, lr1.getRefCount()); Assert.assertEquals(2, lr1.getRefCount());
Assert.assertFalse(tracker.remove(lr1, mockDelService)); Assert.assertFalse(tracker.remove(lr1, mockDelService));
verifyTrackedResourceCount(tracker, 1); verifyTrackedResourceCount(tracker, 2);
// Localize resource1 // Localize resource1
ResourceLocalizedEvent rle = ResourceLocalizedEvent rle =
@ -161,7 +161,7 @@ public class TestLocalResourcesTrackerImpl {
// Verify resources in state LOCALIZED with ref-count=0 is removed. // Verify resources in state LOCALIZED with ref-count=0 is removed.
Assert.assertTrue(tracker.remove(lr1, mockDelService)); Assert.assertTrue(tracker.remove(lr1, mockDelService));
verifyTrackedResourceCount(tracker, 0); verifyTrackedResourceCount(tracker, 1);
} finally { } finally {
if (dispatcher != null) { if (dispatcher != null) {
dispatcher.stop(); dispatcher.stop();
@ -899,6 +899,56 @@ public class TestLocalResourcesTrackerImpl {
} }
} }
@Test
@SuppressWarnings("unchecked")
public void testReleaseWhileDownloading() throws Exception {
String user = "testuser";
DrainDispatcher dispatcher = null;
try {
Configuration conf = new Configuration();
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
dispatcher.register(ContainerEventType.class, containerEventHandler);
ContainerId cId = BuilderUtils.newContainerId(1, 1, 1, 1);
LocalizerContext lc = new LocalizerContext(user, cId, null);
LocalResourceRequest req =
createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
LocalizedResource lr = createLocalizedResource(req, dispatcher);
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
localrsrc.put(req, lr);
LocalResourcesTracker tracker =
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
false, conf, new NMNullStateStoreService(), null);
// request the resource
ResourceEvent reqEvent =
new ResourceRequestEvent(req, LocalResourceVisibility.PUBLIC, lc);
tracker.handle(reqEvent);
// release the resource
ResourceEvent relEvent = new ResourceReleaseEvent(req, cId);
tracker.handle(relEvent);
// download completing after release
ResourceLocalizedEvent rle =
new ResourceLocalizedEvent(req, new Path("file:///tmp/r1"), 1);
tracker.handle(rle);
dispatcher.await();
} finally {
if (dispatcher != null) {
dispatcher.stop();
}
}
}
private boolean createdummylocalizefile(Path path) { private boolean createdummylocalizefile(Path path) {
boolean ret = false; boolean ret = false;
File file = new File(path.toUri().getRawPath().toString()); File file = new File(path.toUri().getRawPath().toString());

View File

@ -487,8 +487,8 @@ public class TestResourceLocalizationService {
Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount()); Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
pubRsrcs.remove(lr.getRequest()); pubRsrcs.remove(lr.getRequest());
} }
Assert.assertEquals(2, pubRsrcs.size()); Assert.assertEquals(0, pubRsrcs.size());
Assert.assertEquals(0, pubRsrcCount); Assert.assertEquals(2, pubRsrcCount);
appRsrcCount = 0; appRsrcCount = 0;
for (LocalizedResource lr : appTracker) { for (LocalizedResource lr : appTracker) {