YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha)

This commit is contained in:
Karthik Kambatla 2015-04-26 09:13:46 -07:00
parent 22b70e7c5a
commit 47279c3228
5 changed files with 62 additions and 15 deletions

View File

@ -268,6 +268,9 @@ Release 2.8.0 - UNRELEASED
YARN-3537. NPE when NodeManager.serviceInit fails and stopRecoveryStore YARN-3537. NPE when NodeManager.serviceInit fails and stopRecoveryStore
invoked (Brahma Reddy Battula via jlowe) invoked (Brahma Reddy Battula via jlowe)
YARN-3464. Race condition in LocalizerRunner kills localizer before
localizing all resources. (Zhihai Xu via kasha)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@ -716,7 +718,12 @@ public class ContainerImpl implements Container {
return ContainerState.LOCALIZING; return ContainerState.LOCALIZING;
} }
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(LocalizationEventType.
CONTAINER_RESOURCES_LOCALIZED, container));
container.sendLaunchEvent(); container.sendLaunchEvent();
container.metrics.endInitingContainer();
// If this is a recovered container that has already launched, skip // If this is a recovered container that has already launched, skip
// uploading resources to the shared cache. We do this to avoid uploading // uploading resources to the shared cache. We do this to avoid uploading
@ -734,7 +741,6 @@ public class ContainerImpl implements Container {
SharedCacheUploadEventType.UPLOAD)); SharedCacheUploadEventType.UPLOAD));
} }
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED; return ContainerState.LOCALIZED;
} }
} }

View File

@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -108,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@ -389,6 +391,9 @@ public class ResourceLocalizationService extends CompositeService
case INIT_CONTAINER_RESOURCES: case INIT_CONTAINER_RESOURCES:
handleInitContainerResources((ContainerLocalizationRequestEvent) event); handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break; break;
case CONTAINER_RESOURCES_LOCALIZED:
handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
break;
case CACHE_CLEANUP: case CACHE_CLEANUP:
handleCacheCleanup(event); handleCacheCleanup(event);
break; break;
@ -456,6 +461,17 @@ public class ResourceLocalizationService extends CompositeService
} }
} }
/**
* Once a container's resources are localized, kill the corresponding
* {@link ContainerLocalizer}
*/
private void handleContainerResourcesLocalized(
ContainerLocalizationEvent event) {
Container c = event.getContainer();
String locId = ConverterUtils.toString(c.getContainerId());
localizerTracker.endContainerLocalization(locId);
}
private void handleCacheCleanup(LocalizationEvent event) { private void handleCacheCleanup(LocalizationEvent event) {
ResourceRetentionSet retain = ResourceRetentionSet retain =
new ResourceRetentionSet(delService, cacheTargetSize); new ResourceRetentionSet(delService, cacheTargetSize);
@ -670,7 +686,7 @@ public class ResourceLocalizationService extends CompositeService
response.setLocalizerAction(LocalizerAction.DIE); response.setLocalizerAction(LocalizerAction.DIE);
return response; return response;
} }
return localizer.update(status.getResources()); return localizer.processHeartbeat(status.getResources());
} }
} }
@ -724,6 +740,17 @@ public class ResourceLocalizationService extends CompositeService
localizer.interrupt(); localizer.interrupt();
} }
} }
public void endContainerLocalization(String locId) {
LocalizerRunner localizer;
synchronized (privLocalizers) {
localizer = privLocalizers.get(locId);
if (null == localizer) {
return; // ignore
}
}
localizer.endContainerLocalization();
}
} }
@ -878,6 +905,7 @@ public class ResourceLocalizationService extends CompositeService
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled; final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
// Its a shared list between Private Localizer and dispatcher thread. // Its a shared list between Private Localizer and dispatcher thread.
final List<LocalizerResourceRequestEvent> pending; final List<LocalizerResourceRequestEvent> pending;
private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);
// TODO: threadsafe, use outer? // TODO: threadsafe, use outer?
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
@ -898,6 +926,10 @@ public class ResourceLocalizationService extends CompositeService
pending.add(request); pending.add(request);
} }
public void endContainerLocalization() {
killContainerLocalizer.set(true);
}
/** /**
* Find next resource to be given to a spawned localizer. * Find next resource to be given to a spawned localizer.
* *
@ -944,7 +976,7 @@ public class ResourceLocalizationService extends CompositeService
} }
} }
LocalizerHeartbeatResponse update( LocalizerHeartbeatResponse processHeartbeat(
List<LocalResourceStatus> remoteResourceStatuses) { List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response = LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
@ -953,7 +985,7 @@ public class ResourceLocalizationService extends CompositeService
ApplicationId applicationId = ApplicationId applicationId =
context.getContainerId().getApplicationAttemptId().getApplicationId(); context.getContainerId().getApplicationAttemptId().getApplicationId();
LocalizerAction action = LocalizerAction.LIVE; boolean fetchFailed = false;
// Update resource statuses. // Update resource statuses.
for (LocalResourceStatus stat : remoteResourceStatuses) { for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource(); LocalResource rsrc = stat.getResource();
@ -989,7 +1021,7 @@ public class ResourceLocalizationService extends CompositeService
case FETCH_FAILURE: case FETCH_FAILURE:
final String diagnostics = stat.getException().toString(); final String diagnostics = stat.getException().toString();
LOG.warn(req + " failed: " + diagnostics); LOG.warn(req + " failed: " + diagnostics);
action = LocalizerAction.DIE; fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId) getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent( .handle(new ResourceFailedLocalizationEvent(
req, diagnostics)); req, diagnostics));
@ -1001,15 +1033,15 @@ public class ResourceLocalizationService extends CompositeService
break; break;
default: default:
LOG.info("Unknown status: " + stat.getStatus()); LOG.info("Unknown status: " + stat.getStatus());
action = LocalizerAction.DIE; fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId) getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent( .handle(new ResourceFailedLocalizationEvent(
req, stat.getException().getMessage())); req, stat.getException().getMessage()));
break; break;
} }
} }
if (action == LocalizerAction.DIE) { if (fetchFailed || killContainerLocalizer.get()) {
response.setLocalizerAction(action); response.setLocalizerAction(LocalizerAction.DIE);
return response; return response;
} }
@ -1037,12 +1069,9 @@ public class ResourceLocalizationService extends CompositeService
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
//TODO fail? Already translated several times... //TODO fail? Already translated several times...
} }
} else if (pending.isEmpty()) {
// TODO: Synchronization
action = LocalizerAction.DIE;
} }
response.setLocalizerAction(action); response.setLocalizerAction(LocalizerAction.LIVE);
response.setResourceSpecs(rsrcs); response.setResourceSpecs(rsrcs);
return response; return response;
} }

View File

@ -23,4 +23,5 @@ public enum LocalizationEventType {
CACHE_CLEANUP, CACHE_CLEANUP,
CLEANUP_CONTAINER_RESOURCES, CLEANUP_CONTAINER_RESOURCES,
DESTROY_APPLICATION_RESOURCES, DESTROY_APPLICATION_RESOURCES,
CONTAINER_RESOURCES_LOCALIZED,
} }

View File

@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@ -975,7 +976,8 @@ public class TestResourceLocalizationService {
.thenReturn(Collections.<LocalResourceStatus>emptyList()) .thenReturn(Collections.<LocalResourceStatus>emptyList())
.thenReturn(Collections.singletonList(rsrc1success)) .thenReturn(Collections.singletonList(rsrc1success))
.thenReturn(Collections.singletonList(rsrc2pending)) .thenReturn(Collections.singletonList(rsrc2pending))
.thenReturn(rsrcs4); .thenReturn(rsrcs4)
.thenReturn(Collections.<LocalResourceStatus>emptyList());
String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE + String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
Path.SEPARATOR + "user0" + Path.SEPARATOR + Path.SEPARATOR + "user0" + Path.SEPARATOR +
@ -1019,7 +1021,13 @@ public class TestResourceLocalizationService {
assertTrue(localizedPath.getFile().endsWith( assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12")); localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
// get shutdown response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
spyService.handle(new ContainerLocalizationEvent(
LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));
// get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
response = spyService.heartbeat(stat); response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); assertEquals(LocalizerAction.DIE, response.getLocalizerAction());