MAPREDUCE-2949. svn merge -c r1170689 --ignore-ancestry ../../trunk/

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1170690 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-14 15:38:29 +00:00
parent 90a5181501
commit e1bc3c3f7d
2 changed files with 19 additions and 8 deletions

View File

@ -1289,6 +1289,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2995. Fixed race condition in ContainerLauncher. (vinodkv via MAPREDUCE-2995. Fixed race condition in ContainerLauncher. (vinodkv via
acmurthy) acmurthy)
MAPREDUCE-2949. Fixed NodeManager to shut-down correctly if a service
startup fails. (Ravi Teja via vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -107,9 +107,10 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
public class ResourceLocalizationService extends AbstractService public class ResourceLocalizationService extends CompositeService
implements EventHandler<LocalizationEvent>, LocalizationProtocol { implements EventHandler<LocalizationEvent>, LocalizationProtocol {
private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class); private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class);
@ -201,9 +202,8 @@ public void init(Configuration conf) {
localizationServerAddress = NetUtils.createSocketAddr( localizationServerAddress = NetUtils.createSocketAddr(
conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS)); conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
localizerTracker = createLocalizerTracker(conf); localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker);
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
super.init(conf); super.init(conf);
} }
@ -214,6 +214,8 @@ public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
@Override @Override
public void start() { public void start() {
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer(); server = createServer();
LOG.info("Localizer started on port " + server.getPort()); LOG.info("Localizer started on port " + server.getPort());
server.start(); server.start();
@ -247,9 +249,7 @@ public void stop() {
if (server != null) { if (server != null) {
server.close(); server.close();
} }
if (localizerTracker != null) { cacheCleanup.shutdown();
localizerTracker.stop();
}
super.stop(); super.stop();
} }
@ -403,7 +403,7 @@ LocalResourcesTracker getLocalResourcesTracker(
/** /**
* Sub-component handling the spawning of {@link ContainerLocalizer}s * Sub-component handling the spawning of {@link ContainerLocalizer}s
*/ */
class LocalizerTracker implements EventHandler<LocalizerEvent> { class LocalizerTracker extends AbstractService implements EventHandler<LocalizerEvent> {
private final PublicLocalizer publicLocalizer; private final PublicLocalizer publicLocalizer;
private final Map<String,LocalizerRunner> privLocalizers; private final Map<String,LocalizerRunner> privLocalizers;
@ -414,9 +414,15 @@ class LocalizerTracker implements EventHandler<LocalizerEvent> {
LocalizerTracker(Configuration conf, LocalizerTracker(Configuration conf,
Map<String,LocalizerRunner> privLocalizers) { Map<String,LocalizerRunner> privLocalizers) {
super(LocalizerTracker.class.getName());
this.publicLocalizer = new PublicLocalizer(conf); this.publicLocalizer = new PublicLocalizer(conf);
this.privLocalizers = privLocalizers; this.privLocalizers = privLocalizers;
}
@Override
public synchronized void start() {
publicLocalizer.start(); publicLocalizer.start();
super.start();
} }
public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) { public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
@ -436,11 +442,13 @@ public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
} }
} }
@Override
public void stop() { public void stop() {
for (LocalizerRunner localizer : privLocalizers.values()) { for (LocalizerRunner localizer : privLocalizers.values()) {
localizer.interrupt(); localizer.interrupt();
} }
publicLocalizer.interrupt(); publicLocalizer.interrupt();
super.stop();
} }
@Override @Override