YARN-495. Changed NM reboot behaviour to be a simple resync - kill all containers and re-register with RM. Contributed by Jian He.
svn merge --ignore-ancestry -c 1466752 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1466753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
71a4677b36
commit
81a684ebd3
|
@ -71,6 +71,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-479. NM retry behavior for connection to RM should be similar for
|
YARN-479. NM retry behavior for connection to RM should be similar for
|
||||||
lost heartbeats (Jian He via bikas)
|
lost heartbeats (Jian He via bikas)
|
||||||
|
|
||||||
|
YARN-495. Changed NM reboot behaviour to be a simple resync - kill all
|
||||||
|
containers and re-register with RM. (Jian He via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -24,5 +24,5 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public enum NodeAction {
|
public enum NodeAction {
|
||||||
NORMAL, REBOOT, SHUTDOWN
|
NORMAL, RESYNC, SHUTDOWN
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ import "yarn_protos.proto";
|
||||||
|
|
||||||
enum NodeActionProto {
|
enum NodeActionProto {
|
||||||
NORMAL = 0;
|
NORMAL = 0;
|
||||||
REBOOT = 1;
|
RESYNC = 1;
|
||||||
SHUTDOWN = 2;
|
SHUTDOWN = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,6 +81,7 @@ public class NodeManager extends CompositeService
|
||||||
private Context context;
|
private Context context;
|
||||||
private AsyncDispatcher dispatcher;
|
private AsyncDispatcher dispatcher;
|
||||||
private ContainerManagerImpl containerManager;
|
private ContainerManagerImpl containerManager;
|
||||||
|
private NodeStatusUpdater nodeStatusUpdater;
|
||||||
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
||||||
|
|
||||||
private long waitForContainersOnShutdownMillis;
|
private long waitForContainersOnShutdownMillis;
|
||||||
|
@ -163,7 +164,7 @@ public void init(Configuration conf) {
|
||||||
addService(nodeHealthChecker);
|
addService(nodeHealthChecker);
|
||||||
dirsHandler = nodeHealthChecker.getDiskHandler();
|
dirsHandler = nodeHealthChecker.getDiskHandler();
|
||||||
|
|
||||||
NodeStatusUpdater nodeStatusUpdater =
|
nodeStatusUpdater =
|
||||||
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
||||||
|
|
||||||
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
||||||
|
@ -214,35 +215,67 @@ public void stop() {
|
||||||
if (isStopping.getAndSet(true)) {
|
if (isStopping.getAndSet(true)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupContainers();
|
cleanupContainers(NodeManagerEventType.SHUTDOWN);
|
||||||
super.stop();
|
super.stop();
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void cleanupContainersOnResync() {
|
||||||
|
//we do not want to block dispatcher thread here
|
||||||
|
new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
cleanupContainers(NodeManagerEventType.RESYNC);
|
||||||
|
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
|
||||||
|
}
|
||||||
|
}.start();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void cleanupContainers() {
|
protected void cleanupContainers(NodeManagerEventType eventType) {
|
||||||
Map<ContainerId, Container> containers = context.getContainers();
|
Map<ContainerId, Container> containers = context.getContainers();
|
||||||
if (containers.isEmpty()) {
|
if (containers.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("Containers still running on shutdown: " + containers.keySet());
|
LOG.info("Containers still running on " + eventType + " : "
|
||||||
|
+ containers.keySet());
|
||||||
|
|
||||||
List<ContainerId> containerIds = new ArrayList<ContainerId>(containers.keySet());
|
List<ContainerId> containerIds =
|
||||||
|
new ArrayList<ContainerId>(containers.keySet());
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new CMgrCompletedContainersEvent(containerIds,
|
new CMgrCompletedContainersEvent(containerIds,
|
||||||
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
|
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
|
||||||
|
|
||||||
LOG.info("Waiting for containers to be killed");
|
LOG.info("Waiting for containers to be killed");
|
||||||
|
|
||||||
long waitStartTime = System.currentTimeMillis();
|
switch (eventType) {
|
||||||
while (!containers.isEmpty() &&
|
case SHUTDOWN:
|
||||||
System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
|
long waitStartTime = System.currentTimeMillis();
|
||||||
try {
|
while (!containers.isEmpty()
|
||||||
Thread.sleep(1000);
|
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
|
||||||
} catch (InterruptedException ex) {
|
try {
|
||||||
LOG.warn("Interrupted while sleeping on container kill", ex);
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
LOG.warn("Interrupted while sleeping on container kill on shutdown",
|
||||||
|
ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
case RESYNC:
|
||||||
|
while (!containers.isEmpty()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
//to remove done containers from the map
|
||||||
|
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
LOG.warn("Interrupted while sleeping on container kill on resync",
|
||||||
|
ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.warn("Invalid eventType: " + eventType);
|
||||||
}
|
}
|
||||||
|
|
||||||
// All containers killed
|
// All containers killed
|
||||||
|
@ -342,9 +375,8 @@ public void handle(NodeManagerEvent event) {
|
||||||
case SHUTDOWN:
|
case SHUTDOWN:
|
||||||
stop();
|
stop();
|
||||||
break;
|
break;
|
||||||
case REBOOT:
|
case RESYNC:
|
||||||
stop();
|
cleanupContainersOnResync();
|
||||||
reboot();
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
||||||
|
@ -361,6 +393,11 @@ ContainerManagerImpl getContainerManager() {
|
||||||
return containerManager;
|
return containerManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//For testing
|
||||||
|
Dispatcher getNMDispatcher(){
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Context getNMContext() {
|
Context getNMContext() {
|
||||||
return this.context;
|
return this.context;
|
||||||
|
|
|
@ -18,5 +18,5 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
public enum NodeManagerEventType {
|
public enum NodeManagerEventType {
|
||||||
SHUTDOWN, REBOOT
|
SHUTDOWN, RESYNC
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.service.Service;
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
|
|
||||||
public interface NodeStatusUpdater extends Service {
|
public interface NodeStatusUpdater extends Service {
|
||||||
|
|
||||||
void sendOutofBandHeartBeat();
|
void sendOutofBandHeartBeat();
|
||||||
|
NodeStatus getNodeStatusAndUpdateContainersInContext();
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,8 @@
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
public class NodeStatusUpdaterImpl extends AbstractService implements
|
public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
NodeStatusUpdater {
|
NodeStatusUpdater {
|
||||||
|
|
||||||
|
@ -91,6 +93,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
private long rmConnectionRetryIntervalMS;
|
private long rmConnectionRetryIntervalMS;
|
||||||
private boolean waitForEver;
|
private boolean waitForEver;
|
||||||
|
|
||||||
|
private Runnable statusUpdaterRunnable;
|
||||||
|
private Thread statusUpdater;
|
||||||
|
|
||||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||||
super(NodeStatusUpdaterImpl.class.getName());
|
super(NodeStatusUpdaterImpl.class.getName());
|
||||||
|
@ -169,6 +174,22 @@ public synchronized void stop() {
|
||||||
this.isStopped = true;
|
this.isStopped = true;
|
||||||
super.stop();
|
super.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void rebootNodeStatusUpdater() {
|
||||||
|
// Interrupt the updater.
|
||||||
|
this.isStopped = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
statusUpdater.join();
|
||||||
|
registerWithRM();
|
||||||
|
statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
|
||||||
|
this.isStopped = false;
|
||||||
|
statusUpdater.start();
|
||||||
|
LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new AvroRuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isSecurityEnabled() {
|
private boolean isSecurityEnabled() {
|
||||||
return UserGroupInformation.isSecurityEnabled();
|
return UserGroupInformation.isSecurityEnabled();
|
||||||
|
@ -188,7 +209,8 @@ protected ResourceTracker getRMClient() {
|
||||||
conf);
|
conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerWithRM() throws YarnRemoteException {
|
@VisibleForTesting
|
||||||
|
protected void registerWithRM() throws YarnRemoteException {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
rmConnectWaitMS =
|
rmConnectWaitMS =
|
||||||
conf.getInt(
|
conf.getInt(
|
||||||
|
@ -312,7 +334,7 @@ private List<ApplicationId> createKeepAliveApplicationList() {
|
||||||
return appList;
|
return appList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private NodeStatus getNodeStatus() {
|
public NodeStatus getNodeStatusAndUpdateContainersInContext() {
|
||||||
|
|
||||||
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
|
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
|
||||||
nodeStatus.setNodeId(this.nodeId);
|
nodeStatus.setNodeId(this.nodeId);
|
||||||
|
@ -387,7 +409,7 @@ public void sendOutofBandHeartBeat() {
|
||||||
|
|
||||||
protected void startStatusUpdater() {
|
protected void startStatusUpdater() {
|
||||||
|
|
||||||
new Thread("Node Status Updater") {
|
statusUpdaterRunnable = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -398,7 +420,7 @@ public void run() {
|
||||||
NodeHeartbeatResponse response = null;
|
NodeHeartbeatResponse response = null;
|
||||||
int rmRetryCount = 0;
|
int rmRetryCount = 0;
|
||||||
long waitStartTime = System.currentTimeMillis();
|
long waitStartTime = System.currentTimeMillis();
|
||||||
NodeStatus nodeStatus = getNodeStatus();
|
NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
|
||||||
nodeStatus.setResponseId(lastHeartBeatID);
|
nodeStatus.setResponseId(lastHeartBeatID);
|
||||||
|
|
||||||
NodeHeartbeatRequest request = recordFactory
|
NodeHeartbeatRequest request = recordFactory
|
||||||
|
@ -453,11 +475,11 @@ public void run() {
|
||||||
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (response.getNodeAction() == NodeAction.REBOOT) {
|
if (response.getNodeAction() == NodeAction.RESYNC) {
|
||||||
LOG.info("Node is out of sync with ResourceManager,"
|
LOG.info("Node is out of sync with ResourceManager,"
|
||||||
+ " hence rebooting.");
|
+ " hence rebooting.");
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new NodeManagerEvent(NodeManagerEventType.REBOOT));
|
new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,6 +522,9 @@ public void run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.start();
|
};
|
||||||
|
statusUpdater =
|
||||||
|
new Thread(statusUpdaterRunnable, "Node Status Updater");
|
||||||
|
statusUpdater.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,7 +160,10 @@ public void testClearLocalDirWhenNodeReboot() throws IOException {
|
||||||
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||||
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
|
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
|
||||||
|
|
||||||
nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
|
// restart the NodeManager
|
||||||
|
nm.stop();
|
||||||
|
nm = new MyNodeManager();
|
||||||
|
nm.start();
|
||||||
|
|
||||||
numTries = 0;
|
numTries = 0;
|
||||||
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
|
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
|
||||||
|
@ -250,26 +253,6 @@ protected DeletionService createDeletionService(ContainerExecutor exec) {
|
||||||
return delService;
|
return delService;
|
||||||
}
|
}
|
||||||
|
|
||||||
// mimic part of reboot process
|
|
||||||
@Override
|
|
||||||
public void handle(NodeManagerEvent event) {
|
|
||||||
switch (event.getType()) {
|
|
||||||
case SHUTDOWN:
|
|
||||||
this.stop();
|
|
||||||
break;
|
|
||||||
case REBOOT:
|
|
||||||
this.stop();
|
|
||||||
this.createNewMyNodeManager().start();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private MyNodeManager createNewMyNodeManager() {
|
|
||||||
return new MyNodeManager();
|
|
||||||
}
|
|
||||||
|
|
||||||
private YarnConfiguration createNMConfig() {
|
private YarnConfiguration createNMConfig() {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
|
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
|
||||||
|
|
|
@ -28,6 +28,9 @@
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
@ -49,9 +52,12 @@
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -71,6 +77,7 @@ public class TestNodeManagerShutdown {
|
||||||
.getRecordFactory(null);
|
.getRecordFactory(null);
|
||||||
static final String user = "nobody";
|
static final String user = "nobody";
|
||||||
private FileContext localFS;
|
private FileContext localFS;
|
||||||
|
private CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws UnsupportedFileSystemException {
|
public void setup() throws UnsupportedFileSystemException {
|
||||||
|
@ -91,52 +98,7 @@ public void testKillContainersOnShutdown() throws IOException {
|
||||||
NodeManager nm = getNodeManager();
|
NodeManager nm = getNodeManager();
|
||||||
nm.init(createNMConfig());
|
nm.init(createNMConfig());
|
||||||
nm.start();
|
nm.start();
|
||||||
|
startContainers(nm);
|
||||||
ContainerManagerImpl containerManager = nm.getContainerManager();
|
|
||||||
File scriptFile = createUnhaltingScriptFile();
|
|
||||||
|
|
||||||
ContainerLaunchContext containerLaunchContext =
|
|
||||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
||||||
|
|
||||||
// Construct the Container-id
|
|
||||||
ContainerId cId = createContainerId();
|
|
||||||
containerLaunchContext.setContainerId(cId);
|
|
||||||
|
|
||||||
containerLaunchContext.setUser(user);
|
|
||||||
|
|
||||||
URL localResourceUri =
|
|
||||||
ConverterUtils.getYarnUrlFromPath(localFS
|
|
||||||
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
|
||||||
LocalResource localResource =
|
|
||||||
recordFactory.newRecordInstance(LocalResource.class);
|
|
||||||
localResource.setResource(localResourceUri);
|
|
||||||
localResource.setSize(-1);
|
|
||||||
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
||||||
localResource.setType(LocalResourceType.FILE);
|
|
||||||
localResource.setTimestamp(scriptFile.lastModified());
|
|
||||||
String destinationFile = "dest_file";
|
|
||||||
Map<String, LocalResource> localResources =
|
|
||||||
new HashMap<String, LocalResource>();
|
|
||||||
localResources.put(destinationFile, localResource);
|
|
||||||
containerLaunchContext.setLocalResources(localResources);
|
|
||||||
containerLaunchContext.setUser(containerLaunchContext.getUser());
|
|
||||||
List<String> commands = new ArrayList<String>();
|
|
||||||
commands.add("/bin/bash");
|
|
||||||
commands.add(scriptFile.getAbsolutePath());
|
|
||||||
containerLaunchContext.setCommands(commands);
|
|
||||||
containerLaunchContext.setResource(recordFactory
|
|
||||||
.newRecordInstance(Resource.class));
|
|
||||||
containerLaunchContext.getResource().setMemory(1024);
|
|
||||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
|
||||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
|
||||||
containerManager.startContainer(startRequest);
|
|
||||||
|
|
||||||
GetContainerStatusRequest request =
|
|
||||||
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
|
|
||||||
request.setContainerId(cId);
|
|
||||||
ContainerStatus containerStatus =
|
|
||||||
containerManager.getContainerStatus(request).getStatus();
|
|
||||||
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
|
|
||||||
|
|
||||||
final int MAX_TRIES=20;
|
final int MAX_TRIES=20;
|
||||||
int numTries = 0;
|
int numTries = 0;
|
||||||
|
@ -170,6 +132,74 @@ public void testKillContainersOnShutdown() throws IOException {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testKillContainersOnResync() throws IOException, InterruptedException {
|
||||||
|
NodeManager nm = new TestNodeManager();
|
||||||
|
YarnConfiguration conf = createNMConfig();
|
||||||
|
nm.init(conf);
|
||||||
|
nm.start();
|
||||||
|
startContainers(nm);
|
||||||
|
|
||||||
|
assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
|
||||||
|
nm.getNMDispatcher().getEventHandler().
|
||||||
|
handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
||||||
|
try {
|
||||||
|
syncBarrier.await();
|
||||||
|
} catch (BrokenBarrierException e) {
|
||||||
|
}
|
||||||
|
assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startContainers(NodeManager nm) throws IOException {
|
||||||
|
ContainerManagerImpl containerManager = nm.getContainerManager();
|
||||||
|
File scriptFile = createUnhaltingScriptFile();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
// Construct the Container-id
|
||||||
|
ContainerId cId = createContainerId();
|
||||||
|
containerLaunchContext.setContainerId(cId);
|
||||||
|
|
||||||
|
containerLaunchContext.setUser(user);
|
||||||
|
|
||||||
|
URL localResourceUri =
|
||||||
|
ConverterUtils.getYarnUrlFromPath(localFS
|
||||||
|
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
||||||
|
LocalResource localResource =
|
||||||
|
recordFactory.newRecordInstance(LocalResource.class);
|
||||||
|
localResource.setResource(localResourceUri);
|
||||||
|
localResource.setSize(-1);
|
||||||
|
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||||
|
localResource.setType(LocalResourceType.FILE);
|
||||||
|
localResource.setTimestamp(scriptFile.lastModified());
|
||||||
|
String destinationFile = "dest_file";
|
||||||
|
Map<String, LocalResource> localResources =
|
||||||
|
new HashMap<String, LocalResource>();
|
||||||
|
localResources.put(destinationFile, localResource);
|
||||||
|
containerLaunchContext.setLocalResources(localResources);
|
||||||
|
containerLaunchContext.setUser(containerLaunchContext.getUser());
|
||||||
|
List<String> commands = new ArrayList<String>();
|
||||||
|
commands.add("/bin/bash");
|
||||||
|
commands.add(scriptFile.getAbsolutePath());
|
||||||
|
containerLaunchContext.setCommands(commands);
|
||||||
|
containerLaunchContext.setResource(recordFactory
|
||||||
|
.newRecordInstance(Resource.class));
|
||||||
|
containerLaunchContext.getResource().setMemory(1024);
|
||||||
|
StartContainerRequest startRequest =
|
||||||
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
|
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
containerManager.startContainer(startRequest);
|
||||||
|
|
||||||
|
GetContainerStatusRequest request =
|
||||||
|
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
|
||||||
|
request.setContainerId(cId);
|
||||||
|
ContainerStatus containerStatus =
|
||||||
|
containerManager.getContainerStatus(request).getStatus();
|
||||||
|
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
|
||||||
|
}
|
||||||
|
|
||||||
private ContainerId createContainerId() {
|
private ContainerId createContainerId() {
|
||||||
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
||||||
appId.setClusterTimestamp(0);
|
appId.setClusterTimestamp(0);
|
||||||
|
@ -226,4 +256,48 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TestNodeManager extends NodeManager {
|
||||||
|
|
||||||
|
private int registrationCount = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||||
|
return new TestNodeStatusUpdaterImpl(context, dispatcher,
|
||||||
|
healthChecker, metrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNMRegistrationCount() {
|
||||||
|
return registrationCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
|
||||||
|
|
||||||
|
public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||||
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||||
|
super(context, dispatcher, healthChecker, metrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void registerWithRM() throws YarnRemoteException {
|
||||||
|
super.registerWithRM();
|
||||||
|
registrationCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rebootNodeStatusUpdater() {
|
||||||
|
ConcurrentMap<ContainerId, Container> containers =
|
||||||
|
getNMContext().getContainers();
|
||||||
|
// ensure that containers are empty before restart nodeStatusUpdater
|
||||||
|
Assert.assertTrue(containers.isEmpty());
|
||||||
|
super.rebootNodeStatusUpdater();
|
||||||
|
try {
|
||||||
|
syncBarrier.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
} catch (BrokenBarrierException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,6 @@ public class TestNodeStatusUpdater {
|
||||||
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
||||||
private final Configuration conf = createNMConfig();
|
private final Configuration conf = createNMConfig();
|
||||||
private NodeManager nm;
|
private NodeManager nm;
|
||||||
protected NodeManager rebootedNodeManager;
|
|
||||||
private boolean containerStatusBackupSuccessfully = true;
|
private boolean containerStatusBackupSuccessfully = true;
|
||||||
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
|
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
|
||||||
|
|
||||||
|
@ -663,8 +662,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void cleanupContainers() {
|
protected void cleanupContainers(NodeManagerEventType eventType) {
|
||||||
super.cleanupContainers();
|
super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
|
||||||
numCleanups.incrementAndGet();
|
numCleanups.incrementAndGet();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -717,50 +716,6 @@ public void testNodeDecommision() throws Exception {
|
||||||
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testNodeReboot() throws Exception {
|
|
||||||
nm = getNodeManager(NodeAction.REBOOT);
|
|
||||||
YarnConfiguration conf = createNMConfig();
|
|
||||||
nm.init(conf);
|
|
||||||
Assert.assertEquals(STATE.INITED, nm.getServiceState());
|
|
||||||
nm.start();
|
|
||||||
|
|
||||||
int waitCount = 0;
|
|
||||||
while (heartBeatID < 1 && waitCount++ != 20) {
|
|
||||||
Thread.sleep(500);
|
|
||||||
}
|
|
||||||
Assert.assertFalse(heartBeatID < 1);
|
|
||||||
|
|
||||||
// NM takes a while to reach the STOPPED state.
|
|
||||||
waitCount = 0;
|
|
||||||
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
|
|
||||||
LOG.info("Waiting for NM to stop..");
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
|
||||||
|
|
||||||
waitCount = 0;
|
|
||||||
while (null == rebootedNodeManager && waitCount++ != 20) {
|
|
||||||
LOG.info("Waiting for NM to reinitialize..");
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
waitCount = 0;
|
|
||||||
while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
|
|
||||||
LOG.info("Waiting for NM to start..");
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
|
|
||||||
|
|
||||||
rebootedNodeManager.stop();
|
|
||||||
waitCount = 0;
|
|
||||||
while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
|
|
||||||
LOG.info("Waiting for NM to stop..");
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNMShutdownForRegistrationFailure() {
|
public void testNMShutdownForRegistrationFailure() {
|
||||||
|
|
||||||
|
@ -1108,12 +1063,6 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
|
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
|
||||||
return myNodeStatusUpdater;
|
return myNodeStatusUpdater;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
NodeManager createNewNodeManager() {
|
|
||||||
rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
|
|
||||||
return rebootedNodeManager;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,13 +73,13 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
private Server server;
|
private Server server;
|
||||||
private InetSocketAddress resourceTrackerAddress;
|
private InetSocketAddress resourceTrackerAddress;
|
||||||
|
|
||||||
private static final NodeHeartbeatResponse reboot = recordFactory
|
private static final NodeHeartbeatResponse resync = recordFactory
|
||||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||||
private static final NodeHeartbeatResponse shutDown = recordFactory
|
private static final NodeHeartbeatResponse shutDown = recordFactory
|
||||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||||
|
|
||||||
static {
|
static {
|
||||||
reboot.setNodeAction(NodeAction.REBOOT);
|
resync.setNodeAction(NodeAction.RESYNC);
|
||||||
|
|
||||||
shutDown.setNodeAction(NodeAction.SHUTDOWN);
|
shutDown.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
}
|
}
|
||||||
|
@ -220,7 +220,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||||
if (rmNode == null) {
|
if (rmNode == null) {
|
||||||
/* node does not exist */
|
/* node does not exist */
|
||||||
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
|
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
|
||||||
return reboot;
|
return resync;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send ping
|
// Send ping
|
||||||
|
@ -250,7 +250,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||||
// TODO: Just sending reboot is not enough. Think more.
|
// TODO: Just sending reboot is not enough. Think more.
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
|
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
|
||||||
return reboot;
|
return resync;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heartbeat response
|
// Heartbeat response
|
||||||
|
|
|
@ -225,9 +225,9 @@ public void testRMRestart() throws Exception {
|
||||||
|
|
||||||
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
||||||
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
|
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
|
||||||
hbResponse = nm2.nodeHeartbeat(true);
|
hbResponse = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
|
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
|
||||||
|
|
||||||
// new NM to represent NM re-register
|
// new NM to represent NM re-register
|
||||||
nm1 = rm2.registerNode("h1:1234", 15120);
|
nm1 = rm2.registerNode("h1:1234", 15120);
|
||||||
|
@ -235,9 +235,9 @@ public void testRMRestart() throws Exception {
|
||||||
|
|
||||||
// verify no more reboot response sent
|
// verify no more reboot response sent
|
||||||
hbResponse = nm1.nodeHeartbeat(true);
|
hbResponse = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
|
Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
|
||||||
hbResponse = nm2.nodeHeartbeat(true);
|
hbResponse = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
|
Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
|
||||||
|
|
||||||
// assert app1 attempt is saved
|
// assert app1 attempt is saved
|
||||||
attempt1 = loadedApp1.getCurrentAppAttempt();
|
attempt1 = loadedApp1.getCurrentAppAttempt();
|
||||||
|
|
|
@ -282,7 +282,7 @@ public void testReboot() throws Exception {
|
||||||
|
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(
|
nodeHeartbeat = nm2.nodeHeartbeat(
|
||||||
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
|
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
|
||||||
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
|
||||||
checkRebootedNMCount(rm, ++initialMetricCount);
|
checkRebootedNMCount(rm, ++initialMetricCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,6 +130,6 @@ public void testRPCResponseId() throws IOException {
|
||||||
|
|
||||||
nodeStatus.setResponseId(0);
|
nodeStatus.setResponseId(0);
|
||||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
|
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
|
||||||
Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
|
Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction()));
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue