diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7e379b073a6..7a436f9d0a6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -92,6 +92,8 @@ Release 2.0.3-alpha - Unreleased YARN-315. Using the common security token protobuf definition from hadoop common. (Suresh Srinivas via vinodkv) + YARN-170. Change NodeManager stop to be reentrant. (Sandy Ryza via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 51b81f25f8b..7d370cdd9f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -54,11 +56,10 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; -import org.apache.hadoop.yarn.service.ServiceStateChangeListener; import org.apache.hadoop.yarn.util.Records; -public class NodeManager extends CompositeService implements - ServiceStateChangeListener { +public class NodeManager extends CompositeService + implements EventHandler { /** * Priority of the NodeManager shutdown hook. @@ -82,6 +83,8 @@ public class NodeManager extends CompositeService implements private long waitForContainersOnShutdownMillis; + private AtomicBoolean isStopping = new AtomicBoolean(false); + public NodeManager() { super(NodeManager.class.getName()); } @@ -152,7 +155,6 @@ public class NodeManager extends CompositeService implements NodeStatusUpdater nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); - nodeStatusUpdater.register(this); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); @@ -167,6 +169,7 @@ public class NodeManager extends CompositeService implements addService(webServer); dispatcher.register(ContainerManagerEventType.class, containerManager); + dispatcher.register(NodeManagerEventType.class, this); addService(dispatcher); DefaultMetricsSystem.initialize("NodeManager"); @@ -198,13 +201,17 @@ public class NodeManager extends CompositeService implements @Override public void stop() { + if (isStopping.getAndSet(true)) { + return; + } + cleanupContainers(); super.stop(); DefaultMetricsSystem.shutdown(); } @SuppressWarnings("unchecked") - private void cleanupContainers() { + protected void cleanupContainers() { Map containers = context.getContainers(); if (containers.isEmpty()) { return; @@ -293,24 +300,10 @@ public class NodeManager extends CompositeService implements return nodeHealthChecker; } - @Override - public void stateChanged(Service service) { - if (NodeStatusUpdaterImpl.class.getName().equals(service.getName()) - && STATE.STOPPED.equals(service.getServiceState())) { - - boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode(); - - // Shutdown the Nodemanager when the NodeStatusUpdater is stopped. - stop(); - - // Reboot the whole node-manager if NodeStatusUpdater got a reboot command - // from the RM. - if (hasToReboot) { - LOG.info("Rebooting the node manager."); - NodeManager nodeManager = createNewNodeManager(); - nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot); - } - } + private void reboot() { + LOG.info("Rebooting the node manager."); + NodeManager nodeManager = createNewNodeManager(); + nodeManager.initAndStartNodeManager(this.getConfig(), true); } private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { @@ -333,6 +326,21 @@ public class NodeManager extends CompositeService implements } } + @Override + public void handle(NodeManagerEvent event) { + switch (event.getType()) { + case SHUTDOWN: + stop(); + break; + case REBOOT: + stop(); + reboot(); + break; + default: + LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); + } + } + // For testing NodeManager createNewNodeManager() { return new NodeManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEvent.java new file mode 100644 index 00000000000..d6ac28361f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class NodeManagerEvent extends + AbstractEvent{ + + public NodeManagerEvent(NodeManagerEventType type) { + super(type); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java new file mode 100644 index 00000000000..d18cec6c0fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +public enum NodeManagerEventType { + SHUTDOWN, REBOOT +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 22ec10a5a8e..ae9ce0b4bdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -88,8 +88,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - private boolean hasToRebootNode; - public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -149,18 +147,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements super.stop(); } - private synchronized void reboot() { - this.hasToRebootNode = true; - // Stop the status-updater. This will trigger a sub-service state change in - // the NodeManager which will then decide to reboot or not based on - // isRebooted. - this.stop(); - } - - synchronized boolean hasToRebootNode() { - return this.hasToRebootNode; - } - private boolean isSecurityEnabled() { return UserGroupInformation.isSecurityEnabled(); } @@ -348,13 +334,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + " hence shutting down."); - NodeStatusUpdaterImpl.this.stop(); + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); break; } if (response.getNodeAction() == NodeAction.REBOOT) { LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); - NodeStatusUpdaterImpl.this.reboot(); + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.REBOOT)); break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 41d171f97c0..d65b096bbe8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,7 +98,7 @@ public class TestNodeStatusUpdater { public void tearDown() { this.registeredNodes.clear(); heartBeatID = 0; - if (nm != null) { + if (nm != null && nm.getServiceState() == STATE.STARTED) { nm.stop(); } DefaultMetricsSystem.shutdown(); @@ -446,6 +447,52 @@ public class TestNodeStatusUpdater { nm.stop(); } + @Test + public void testStopReentrant() throws Exception { + final AtomicInteger numCleanups = new AtomicInteger(0); + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( + context, dispatcher, healthChecker, metrics); + MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); + myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN; + myNodeStatusUpdater.resourceTracker = myResourceTracker2; + return myNodeStatusUpdater; + } + + @Override + protected void cleanupContainers() { + super.cleanupContainers(); + numCleanups.incrementAndGet(); + } + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + int waitCount = 0; + while (heartBeatID < 1 && waitCount++ != 20) { + Thread.sleep(500); + } + Assert.assertFalse(heartBeatID < 1); + + // Meanwhile call stop directly as the shutdown hook would + nm.stop(); + + // NM takes a while to reach the STOPPED state. + waitCount = 0; + while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { + LOG.info("Waiting for NM to stop.."); + Thread.sleep(1000); + } + + Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); + Assert.assertEquals(numCleanups.get(), 1); + } + @Test public void testNodeDecommision() throws Exception { nm = getNodeManager(NodeAction.SHUTDOWN);