From 6ade6b5051fc16472ada1d5c73792999e74aa67c Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Sun, 26 Apr 2015 09:13:46 -0700 Subject: [PATCH] YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha) (cherry picked from commit 47279c3228185548ed09c36579b420225e4894f5) (cherry picked from commit 4045c41afe440b773d006e962bf8a5eae3fdc284) (cherry picked from commit 6f2cc0dfa8f21984ecdab59dc087ccf525934930) --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../container/ContainerImpl.java | 6 +++ .../ResourceLocalizationService.java | 53 ++++++++++++++----- .../event/LocalizationEventType.java | 1 + .../TestResourceLocalizationService.java | 22 ++++---- 5 files changed, 62 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e9c52173085..442614dad92 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -132,6 +132,9 @@ Release 2.6.1 - UNRELEASED YARN-3024. LocalizerRunner should give DIE action when all resources are localized. (Chengbing Liu via xgong) + YARN-3464. Race condition in LocalizerRunner kills localizer before + localizing all resources. (Zhihai Xu via kasha) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index fa54ee19b9e..eadc9a71197 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -58,7 +58,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; @@ -709,6 +711,10 @@ public class ContainerImpl implements Container { return ContainerState.LOCALIZING; } + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationEvent(LocalizationEventType. + CONTAINER_RESOURCES_LOCALIZED, container)); + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 08196a5ae83..d38bb7a7766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -49,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -107,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -388,6 +390,9 @@ public class ResourceLocalizationService extends CompositeService case INIT_CONTAINER_RESOURCES: handleInitContainerResources((ContainerLocalizationRequestEvent) event); break; + case CONTAINER_RESOURCES_LOCALIZED: + handleContainerResourcesLocalized((ContainerLocalizationEvent) event); + break; case CACHE_CLEANUP: handleCacheCleanup(event); break; @@ -450,7 +455,18 @@ public class ResourceLocalizationService extends CompositeService } } } - + + /** + * Once a container's resources are localized, kill the corresponding + * {@link ContainerLocalizer} + */ + private void handleContainerResourcesLocalized( + ContainerLocalizationEvent event) { + Container c = event.getContainer(); + String locId = ConverterUtils.toString(c.getContainerId()); + localizerTracker.endContainerLocalization(locId); + } + private void handleCacheCleanup(LocalizationEvent event) { ResourceRetentionSet retain = new ResourceRetentionSet(delService, cacheTargetSize); @@ -661,7 +677,7 @@ public class ResourceLocalizationService extends CompositeService response.setLocalizerAction(LocalizerAction.DIE); return response; } - return localizer.update(status.getResources()); + return localizer.processHeartbeat(status.getResources()); } } @@ -715,6 +731,17 @@ public class ResourceLocalizationService extends CompositeService localizer.interrupt(); } } + + public void endContainerLocalization(String locId) { + LocalizerRunner localizer; + synchronized (privLocalizers) { + localizer = privLocalizers.get(locId); + if (null == localizer) { + return; // ignore + } + } + localizer.endContainerLocalization(); + } } @@ -863,6 +890,7 @@ public class ResourceLocalizationService extends CompositeService final Map scheduled; // Its a shared list between Private Localizer and dispatcher thread. final List pending; + private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false); // TODO: threadsafe, use outer? private final RecordFactory recordFactory = @@ -883,6 +911,10 @@ public class ResourceLocalizationService extends CompositeService pending.add(request); } + public void endContainerLocalization() { + killContainerLocalizer.set(true); + } + /** * Find next resource to be given to a spawned localizer. * @@ -929,7 +961,7 @@ public class ResourceLocalizationService extends CompositeService } } - LocalizerHeartbeatResponse update( + LocalizerHeartbeatResponse processHeartbeat( List remoteResourceStatuses) { LocalizerHeartbeatResponse response = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); @@ -938,7 +970,7 @@ public class ResourceLocalizationService extends CompositeService ApplicationId applicationId = context.getContainerId().getApplicationAttemptId().getApplicationId(); - LocalizerAction action = LocalizerAction.LIVE; + boolean fetchFailed = false; // Update resource statuses. for (LocalResourceStatus stat : remoteResourceStatuses) { LocalResource rsrc = stat.getResource(); @@ -974,7 +1006,7 @@ public class ResourceLocalizationService extends CompositeService case FETCH_FAILURE: final String diagnostics = stat.getException().toString(); LOG.warn(req + " failed: " + diagnostics); - response.setLocalizerAction(LocalizerAction.DIE); + fetchFailed = true; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, diagnostics)); @@ -986,15 +1018,15 @@ public class ResourceLocalizationService extends CompositeService break; default: LOG.info("Unknown status: " + stat.getStatus()); - action = LocalizerAction.DIE; + fetchFailed = true; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, stat.getException().getMessage())); break; } } - if (action == LocalizerAction.DIE) { - response.setLocalizerAction(action); + if (fetchFailed || killContainerLocalizer.get()) { + response.setLocalizerAction(LocalizerAction.DIE); return response; } @@ -1022,12 +1054,9 @@ public class ResourceLocalizationService extends CompositeService } catch (URISyntaxException e) { //TODO fail? Already translated several times... } - } else if (pending.isEmpty()) { - // TODO: Synchronization - action = LocalizerAction.DIE; } - response.setLocalizerAction(action); + response.setLocalizerAction(LocalizerAction.LIVE); response.setResourceSpecs(rsrcs); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java index 5134349d9c7..4785fba4229 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java @@ -23,4 +23,5 @@ public enum LocalizationEventType { CACHE_CLEANUP, CLEANUP_CONTAINER_RESOURCES, DESTROY_APPLICATION_RESOURCES, + CONTAINER_RESOURCES_LOCALIZED, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 0894dc68fd8..16901cb2b77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -22,28 +22,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyShort; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; @@ -63,12 +60,6 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.security.AccessControlException; -import org.junit.Assert; - import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; @@ -76,6 +67,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -128,6 +120,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -145,6 +138,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -942,7 +936,13 @@ public class TestResourceLocalizationService { assertTrue(localizedPath.getFile().endsWith( localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12")); - // get shutdown + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + + spyService.handle(new ContainerLocalizationEvent( + LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c)); + + // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction());