YARN-3024. LocalizerRunner should give DIE action when all resources are

localized. Contributed by Chengbing Liu

(cherry picked from commit 0d6bd62102)
This commit is contained in:
Xuan 2015-01-25 19:37:57 -08:00
parent d46738ca5c
commit a7696b3fbf
3 changed files with 91 additions and 82 deletions

View File

@ -166,6 +166,9 @@ Release 2.7.0 - UNRELEASED
YARN-2800. Remove MemoryNodeLabelsStore and add a way to enable/disable YARN-2800. Remove MemoryNodeLabelsStore and add a way to enable/disable
node labels feature. (Wangda Tan via ozawa) node labels feature. (Wangda Tan via ozawa)
YARN-3024. LocalizerRunner should give DIE action when all resources are
localized. (Chengbing Liu via xgong)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -763,7 +763,7 @@ public class ResourceLocalizationService extends CompositeService
*/ */
if (rsrc.tryAcquire()) { if (rsrc.tryAcquire()) {
if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { if (rsrc.getState() == ResourceState.DOWNLOADING) {
LocalResource resource = request.getResource().getRequest(); LocalResource resource = request.getResource().getRequest();
try { try {
Path publicRootPath = Path publicRootPath =
@ -895,7 +895,7 @@ public class ResourceLocalizationService extends CompositeService
LocalizedResource nRsrc = evt.getResource(); LocalizedResource nRsrc = evt.getResource();
// Resource download should take place ONLY if resource is in // Resource download should take place ONLY if resource is in
// Downloading state // Downloading state
if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { if (nRsrc.getState() != ResourceState.DOWNLOADING) {
i.remove(); i.remove();
continue; continue;
} }
@ -906,7 +906,7 @@ public class ResourceLocalizationService extends CompositeService
* 2) Resource is still in DOWNLOADING state * 2) Resource is still in DOWNLOADING state
*/ */
if (nRsrc.tryAcquire()) { if (nRsrc.tryAcquire()) {
if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { if (nRsrc.getState() == ResourceState.DOWNLOADING) {
LocalResourceRequest nextRsrc = nRsrc.getRequest(); LocalResourceRequest nextRsrc = nRsrc.getRequest();
LocalResource next = LocalResource next =
recordFactory.newRecordInstance(LocalResource.class); recordFactory.newRecordInstance(LocalResource.class);
@ -936,41 +936,9 @@ public class ResourceLocalizationService extends CompositeService
String user = context.getUser(); String user = context.getUser();
ApplicationId applicationId = ApplicationId applicationId =
context.getContainerId().getApplicationAttemptId().getApplicationId(); context.getContainerId().getApplicationAttemptId().getApplicationId();
// The localizer has just spawned. Start giving it resources for
// remote-fetching.
if (remoteResourceStatuses.isEmpty()) {
LocalResource next = findNextResource();
if (next != null) {
response.setLocalizerAction(LocalizerAction.LIVE);
try {
ArrayList<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
ResourceLocalizationSpec rsrc =
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
getPathForLocalization(next));
rsrcs.add(rsrc);
response.setResourceSpecs(rsrcs);
} catch (IOException e) {
LOG.error("local path for PRIVATE localization could not be found."
+ "Disks might have failed.", e);
} catch (URISyntaxException e) {
// TODO fail? Already translated several times...
}
} else if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
} else {
response.setLocalizerAction(LocalizerAction.LIVE);
}
return response;
}
ArrayList<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
/*
* TODO : It doesn't support multiple downloads per ContainerLocalizer
* at the same time. We need to think whether we should support this.
*/
LocalizerAction action = LocalizerAction.LIVE;
// Update resource statuses.
for (LocalResourceStatus stat : remoteResourceStatuses) { for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource(); LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null; LocalResourceRequest req = null;
@ -999,30 +967,8 @@ public class ResourceLocalizationService extends CompositeService
// list // list
assoc.getResource().unlock(); assoc.getResource().unlock();
scheduled.remove(req); scheduled.remove(req);
if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
break;
}
response.setLocalizerAction(LocalizerAction.LIVE);
LocalResource next = findNextResource();
if (next != null) {
try {
ResourceLocalizationSpec resource =
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
getPathForLocalization(next));
rsrcs.add(resource);
} catch (IOException e) {
LOG.error("local path for PRIVATE localization could not be " +
"found. Disks might have failed.", e);
} catch (URISyntaxException e) {
//TODO fail? Already translated several times...
}
}
break; break;
case FETCH_PENDING: case FETCH_PENDING:
response.setLocalizerAction(LocalizerAction.LIVE);
break; break;
case FETCH_FAILURE: case FETCH_FAILURE:
final String diagnostics = stat.getException().toString(); final String diagnostics = stat.getException().toString();
@ -1036,17 +982,48 @@ public class ResourceLocalizationService extends CompositeService
// list // list
assoc.getResource().unlock(); assoc.getResource().unlock();
scheduled.remove(req); scheduled.remove(req);
break; break;
default: default:
LOG.info("Unknown status: " + stat.getStatus()); LOG.info("Unknown status: " + stat.getStatus());
response.setLocalizerAction(LocalizerAction.DIE); action = LocalizerAction.DIE;
getLocalResourcesTracker(req.getVisibility(), user, applicationId) getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent( .handle(new ResourceFailedLocalizationEvent(
req, stat.getException().getMessage())); req, stat.getException().getMessage()));
break; break;
} }
} }
if (action == LocalizerAction.DIE) {
response.setLocalizerAction(action);
return response;
}
// Give the localizer resources for remote-fetching.
List<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
/*
* TODO : It doesn't support multiple downloads per ContainerLocalizer
* at the same time. We need to think whether we should support this.
*/
LocalResource next = findNextResource();
if (next != null) {
try {
ResourceLocalizationSpec resource =
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
getPathForLocalization(next));
rsrcs.add(resource);
} catch (IOException e) {
LOG.error("local path for PRIVATE localization could not be " +
"found. Disks might have failed.", e);
} catch (URISyntaxException e) {
//TODO fail? Already translated several times...
}
} else if (pending.isEmpty()) {
// TODO: Synchronization
action = LocalizerAction.DIE;
}
response.setLocalizerAction(action);
response.setResourceSpecs(rsrcs); response.setResourceSpecs(rsrcs);
return response; return response;
} }

View File

@ -827,10 +827,16 @@ public class TestResourceLocalizationService {
do { do {
resource2 = getPrivateMockedResource(r); resource2 = getPrivateMockedResource(r);
} while (resource2 == null || resource2.equals(resource1)); } while (resource2 == null || resource2.equals(resource1));
LocalResource resource3 = null;
do {
resource3 = getPrivateMockedResource(r);
} while (resource3 == null || resource3.equals(resource1)
|| resource3.equals(resource2));
// above call to make sure we don't get identical resources. // above call to make sure we don't get identical resources.
final LocalResourceRequest req1 = new LocalResourceRequest(resource1); final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
final LocalResourceRequest req2 = new LocalResourceRequest(resource2); final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs = Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility, new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>(); Collection<LocalResourceRequest>>();
@ -838,6 +844,7 @@ public class TestResourceLocalizationService {
new ArrayList<LocalResourceRequest>(); new ArrayList<LocalResourceRequest>();
privateResourceList.add(req1); privateResourceList.add(req1);
privateResourceList.add(req2); privateResourceList.add(req2);
privateResourceList.add(req3);
rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible // Sigh. Thread init of private localizer not accessible
@ -852,30 +859,47 @@ public class TestResourceLocalizationService {
Path localizationTokenPath = tokenPathCaptor.getValue(); Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer // heartbeat from localizer
LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class); LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class); LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
LocalResourceStatus rsrc2success = mock(LocalResourceStatus.class);
LocalResourceStatus rsrc3success = mock(LocalResourceStatus.class);
LocalizerStatus stat = mock(LocalizerStatus.class); LocalizerStatus stat = mock(LocalizerStatus.class);
when(stat.getLocalizerId()).thenReturn(ctnrStr); when(stat.getLocalizerId()).thenReturn(ctnrStr);
when(rsrcStat1.getResource()).thenReturn(resource1); when(rsrc1success.getResource()).thenReturn(resource1);
when(rsrcStat2.getResource()).thenReturn(resource2); when(rsrc2pending.getResource()).thenReturn(resource2);
when(rsrcStat1.getLocalSize()).thenReturn(4344L); when(rsrc2success.getResource()).thenReturn(resource2);
when(rsrcStat2.getLocalSize()).thenReturn(2342L); when(rsrc3success.getResource()).thenReturn(resource3);
when(rsrc1success.getLocalSize()).thenReturn(4344L);
when(rsrc2success.getLocalSize()).thenReturn(2342L);
when(rsrc3success.getLocalSize()).thenReturn(5345L);
URL locPath = getPath("/cache/private/blah"); URL locPath = getPath("/cache/private/blah");
when(rsrcStat1.getLocalPath()).thenReturn(locPath); when(rsrc1success.getLocalPath()).thenReturn(locPath);
when(rsrcStat2.getLocalPath()).thenReturn(locPath); when(rsrc2success.getLocalPath()).thenReturn(locPath);
when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); when(rsrc3success.getLocalPath()).thenReturn(locPath);
when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING);
when(rsrc2success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(rsrc3success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
// Four heartbeats with sending:
// 1 - empty
// 2 - resource1 FETCH_SUCCESS
// 3 - resource2 FETCH_PENDING
// 4 - resource2 FETCH_SUCCESS, resource3 FETCH_SUCCESS
List<LocalResourceStatus> rsrcs4 = new ArrayList<LocalResourceStatus>();
rsrcs4.add(rsrc2success);
rsrcs4.add(rsrc3success);
when(stat.getResources()) when(stat.getResources())
.thenReturn(Collections.<LocalResourceStatus>emptyList()) .thenReturn(Collections.<LocalResourceStatus>emptyList())
.thenReturn(Collections.singletonList(rsrcStat1)) .thenReturn(Collections.singletonList(rsrc1success))
.thenReturn(Collections.singletonList(rsrcStat2)) .thenReturn(Collections.singletonList(rsrc2pending))
.thenReturn(Collections.<LocalResourceStatus>emptyList()); .thenReturn(rsrcs4);
String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE + String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
Path.SEPARATOR + "user0" + Path.SEPARATOR + Path.SEPARATOR + "user0" + Path.SEPARATOR +
ContainerLocalizer.FILECACHE; ContainerLocalizer.FILECACHE;
// get first resource // First heartbeat
LocalizerHeartbeatResponse response = spyService.heartbeat(stat); LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(1, response.getResourceSpecs().size()); assertEquals(1, response.getResourceSpecs().size());
@ -888,7 +912,7 @@ public class TestResourceLocalizationService {
assertTrue(localizedPath.getFile().endsWith( assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "10")); localPath + Path.SEPARATOR + "10"));
// get second resource // Second heartbeat
response = spyService.heartbeat(stat); response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(1, response.getResourceSpecs().size()); assertEquals(1, response.getResourceSpecs().size());
@ -902,16 +926,21 @@ public class TestResourceLocalizationService {
assertTrue(localizedPath.getFile().endsWith( assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11")); localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
// empty rsrc // Third heartbeat
response = spyService.heartbeat(stat); response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(0, response.getResourceSpecs().size()); assertEquals(1, response.getResourceSpecs().size());
assertEquals(req3, new LocalResourceRequest(response.getResourceSpecs()
.get(0).getResource()));
localizedPath =
response.getResourceSpecs().get(0).getDestinationDirectory();
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
// get shutdown // get shutdown
response = spyService.heartbeat(stat); response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
dispatcher.await(); dispatcher.await();
// verify container notification // verify container notification
ArgumentMatcher<ContainerEvent> matchesContainerLoc = ArgumentMatcher<ContainerEvent> matchesContainerLoc =
@ -923,8 +952,8 @@ public class TestResourceLocalizationService {
&& c.getContainerId() == evt.getContainerID(); && c.getContainerId() == evt.getContainerID();
} }
}; };
// total 2 resource localzation calls. one for each resource. // total 3 resource localzation calls. one for each resource.
verify(containerBus, times(2)).handle(argThat(matchesContainerLoc)); verify(containerBus, times(3)).handle(argThat(matchesContainerLoc));
// Verify deletion of localization token. // Verify deletion of localization token.
verify(delService).delete((String)isNull(), eq(localizationTokenPath)); verify(delService).delete((String)isNull(), eq(localizationTokenPath));