diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 135aba975b9..21974f794c7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1289,6 +1289,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2995. Fixed race condition in ContainerLauncher. (vinodkv via acmurthy) + MAPREDUCE-2949. Fixed NodeManager to shut-down correctly if a service + startup fails. (Ravi Teja via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index d120b5ccdf9..ead650fab89 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -107,9 +107,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.util.ConverterUtils; -public class ResourceLocalizationService extends AbstractService +public class ResourceLocalizationService extends CompositeService implements EventHandler, LocalizationProtocol { private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class); @@ -201,9 +202,8 @@ public void init(Configuration conf) { localizationServerAddress = NetUtils.createSocketAddr( conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS)); localizerTracker = createLocalizerTracker(conf); + addService(localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker); - cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), - cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); super.init(conf); } @@ -214,6 +214,8 @@ public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) { @Override public void start() { + cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), + cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); server = createServer(); LOG.info("Localizer started on port " + server.getPort()); server.start(); @@ -247,9 +249,7 @@ public void stop() { if (server != null) { server.close(); } - if (localizerTracker != null) { - localizerTracker.stop(); - } + cacheCleanup.shutdown(); super.stop(); } @@ -403,7 +403,7 @@ LocalResourcesTracker getLocalResourcesTracker( /** * Sub-component handling the spawning of {@link ContainerLocalizer}s */ - class LocalizerTracker implements EventHandler { + class LocalizerTracker extends AbstractService implements EventHandler { private final PublicLocalizer publicLocalizer; private final Map privLocalizers; @@ -414,9 +414,15 @@ class LocalizerTracker implements EventHandler { LocalizerTracker(Configuration conf, Map privLocalizers) { + super(LocalizerTracker.class.getName()); this.publicLocalizer = new PublicLocalizer(conf); this.privLocalizers = privLocalizers; + } + + @Override + public synchronized void start() { publicLocalizer.start(); + super.start(); } public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) { @@ -435,12 +441,14 @@ public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) { return localizer.update(status.getResources()); } } - + + @Override public void stop() { for (LocalizerRunner localizer : privLocalizers.values()) { localizer.interrupt(); } publicLocalizer.interrupt(); + super.stop(); } @Override