YARN-170. Change NodeManager stop to be reentrant. Contributed by Sandy Ryza.
svn merge --ignore-ancestry -c 1429796 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1429797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9a87b2f7fe
commit
6a52f90017
|
@ -75,6 +75,8 @@ Release 2.0.3-alpha - Unreleased
|
|||
YARN-315. Using the common security token protobuf definition from hadoop
|
||||
common. (Suresh Srinivas via vinodkv)
|
||||
|
||||
YARN-170. Change NodeManager stop to be reentrant. (Sandy Ryza via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -54,11 +56,10 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
|
|||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
public class NodeManager extends CompositeService implements
|
||||
ServiceStateChangeListener {
|
||||
public class NodeManager extends CompositeService
|
||||
implements EventHandler<NodeManagerEvent> {
|
||||
|
||||
/**
|
||||
* Priority of the NodeManager shutdown hook.
|
||||
|
@ -82,6 +83,8 @@ public class NodeManager extends CompositeService implements
|
|||
|
||||
private long waitForContainersOnShutdownMillis;
|
||||
|
||||
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
||||
|
||||
public NodeManager() {
|
||||
super(NodeManager.class.getName());
|
||||
}
|
||||
|
@ -152,7 +155,6 @@ public class NodeManager extends CompositeService implements
|
|||
|
||||
NodeStatusUpdater nodeStatusUpdater =
|
||||
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
||||
nodeStatusUpdater.register(this);
|
||||
|
||||
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
||||
addService(nodeResourceMonitor);
|
||||
|
@ -167,6 +169,7 @@ public class NodeManager extends CompositeService implements
|
|||
addService(webServer);
|
||||
|
||||
dispatcher.register(ContainerManagerEventType.class, containerManager);
|
||||
dispatcher.register(NodeManagerEventType.class, this);
|
||||
addService(dispatcher);
|
||||
|
||||
DefaultMetricsSystem.initialize("NodeManager");
|
||||
|
@ -198,13 +201,17 @@ public class NodeManager extends CompositeService implements
|
|||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (isStopping.getAndSet(true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
cleanupContainers();
|
||||
super.stop();
|
||||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void cleanupContainers() {
|
||||
protected void cleanupContainers() {
|
||||
Map<ContainerId, Container> containers = context.getContainers();
|
||||
if (containers.isEmpty()) {
|
||||
return;
|
||||
|
@ -293,24 +300,10 @@ public class NodeManager extends CompositeService implements
|
|||
return nodeHealthChecker;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stateChanged(Service service) {
|
||||
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
|
||||
&& STATE.STOPPED.equals(service.getServiceState())) {
|
||||
|
||||
boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
|
||||
|
||||
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
|
||||
stop();
|
||||
|
||||
// Reboot the whole node-manager if NodeStatusUpdater got a reboot command
|
||||
// from the RM.
|
||||
if (hasToReboot) {
|
||||
private void reboot() {
|
||||
LOG.info("Rebooting the node manager.");
|
||||
NodeManager nodeManager = createNewNodeManager();
|
||||
nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot);
|
||||
}
|
||||
}
|
||||
nodeManager.initAndStartNodeManager(this.getConfig(), true);
|
||||
}
|
||||
|
||||
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
|
||||
|
@ -333,6 +326,21 @@ public class NodeManager extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(NodeManagerEvent event) {
|
||||
switch (event.getType()) {
|
||||
case SHUTDOWN:
|
||||
stop();
|
||||
break;
|
||||
case REBOOT:
|
||||
stop();
|
||||
reboot();
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
||||
}
|
||||
}
|
||||
|
||||
// For testing
|
||||
NodeManager createNewNodeManager() {
|
||||
return new NodeManager();
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
|
||||
public class NodeManagerEvent extends
|
||||
AbstractEvent<NodeManagerEventType>{
|
||||
|
||||
public NodeManagerEvent(NodeManagerEventType type) {
|
||||
super(type);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
public enum NodeManagerEventType {
|
||||
SHUTDOWN, REBOOT
|
||||
}
|
|
@ -88,8 +88,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
private final NodeHealthCheckerService healthChecker;
|
||||
private final NodeManagerMetrics metrics;
|
||||
|
||||
private boolean hasToRebootNode;
|
||||
|
||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||
super(NodeStatusUpdaterImpl.class.getName());
|
||||
|
@ -149,18 +147,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
super.stop();
|
||||
}
|
||||
|
||||
private synchronized void reboot() {
|
||||
this.hasToRebootNode = true;
|
||||
// Stop the status-updater. This will trigger a sub-service state change in
|
||||
// the NodeManager which will then decide to reboot or not based on
|
||||
// isRebooted.
|
||||
this.stop();
|
||||
}
|
||||
|
||||
synchronized boolean hasToRebootNode() {
|
||||
return this.hasToRebootNode;
|
||||
}
|
||||
|
||||
private boolean isSecurityEnabled() {
|
||||
return UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
|
@ -348,13 +334,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
LOG
|
||||
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
|
||||
" hence shutting down.");
|
||||
NodeStatusUpdaterImpl.this.stop();
|
||||
dispatcher.getEventHandler().handle(
|
||||
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
||||
break;
|
||||
}
|
||||
if (response.getNodeAction() == NodeAction.REBOOT) {
|
||||
LOG.info("Node is out of sync with ResourceManager,"
|
||||
+ " hence rebooting.");
|
||||
NodeStatusUpdaterImpl.this.reboot();
|
||||
dispatcher.getEventHandler().handle(
|
||||
new NodeManagerEvent(NodeManagerEventType.REBOOT));
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -97,7 +98,7 @@ public class TestNodeStatusUpdater {
|
|||
public void tearDown() {
|
||||
this.registeredNodes.clear();
|
||||
heartBeatID = 0;
|
||||
if (nm != null) {
|
||||
if (nm != null && nm.getServiceState() == STATE.STARTED) {
|
||||
nm.stop();
|
||||
}
|
||||
DefaultMetricsSystem.shutdown();
|
||||
|
@ -446,6 +447,52 @@ public class TestNodeStatusUpdater {
|
|||
nm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopReentrant() throws Exception {
|
||||
final AtomicInteger numCleanups = new AtomicInteger(0);
|
||||
nm = new NodeManager() {
|
||||
@Override
|
||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
|
||||
context, dispatcher, healthChecker, metrics);
|
||||
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
|
||||
myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN;
|
||||
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
|
||||
return myNodeStatusUpdater;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanupContainers() {
|
||||
super.cleanupContainers();
|
||||
numCleanups.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
YarnConfiguration conf = createNMConfig();
|
||||
nm.init(conf);
|
||||
nm.start();
|
||||
|
||||
int waitCount = 0;
|
||||
while (heartBeatID < 1 && waitCount++ != 20) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
Assert.assertFalse(heartBeatID < 1);
|
||||
|
||||
// Meanwhile call stop directly as the shutdown hook would
|
||||
nm.stop();
|
||||
|
||||
// NM takes a while to reach the STOPPED state.
|
||||
waitCount = 0;
|
||||
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
|
||||
LOG.info("Waiting for NM to stop..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
||||
Assert.assertEquals(numCleanups.get(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeDecommision() throws Exception {
|
||||
nm = getNodeManager(NodeAction.SHUTDOWN);
|
||||
|
|
Loading…
Reference in New Issue