YARN-1278. Fixed NodeManager to not delete local resources for apps on resync command from RM - a bug caused by YARN-1149. Contributed by Hitesh Shah.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529657 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cb5a51565a
commit
44f7ee7192
|
@ -188,6 +188,9 @@ Release 2.1.2 - UNRELEASED
|
|||
and log-dirs correctly even when there are no resources to localize for the
|
||||
container. (Siddharth Seth via vinodkv)
|
||||
|
||||
YARN-1278. Fixed NodeManager to not delete local resources for apps on resync
|
||||
command from RM - a bug caused by YARN-1149. (Hitesh Shah via vinodkv)
|
||||
|
||||
Release 2.1.1-beta - 2013-09-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -25,13 +25,39 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
|
||||
|
||||
private final List<ContainerId> containerToCleanup;
|
||||
private final Reason reason;
|
||||
|
||||
public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) {
|
||||
public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup,
|
||||
Reason reason) {
|
||||
super(ContainerManagerEventType.FINISH_CONTAINERS);
|
||||
this.containerToCleanup = containersToCleanup;
|
||||
this.reason = reason;
|
||||
}
|
||||
|
||||
public List<ContainerId> getContainersToCleanup() {
|
||||
return this.containerToCleanup;
|
||||
}
|
||||
|
||||
public Reason getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
public static enum Reason {
|
||||
/**
|
||||
* Container is killed as NodeManager is shutting down
|
||||
*/
|
||||
ON_SHUTDOWN,
|
||||
|
||||
/**
|
||||
* Container is killed as the Nodemanager is re-syncing with the
|
||||
* ResourceManager
|
||||
*/
|
||||
ON_NODEMANAGER_RESYNC,
|
||||
|
||||
/**
|
||||
* Container is killed on request by the ResourceManager
|
||||
*/
|
||||
BY_RESOURCEMANAGER
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -226,7 +226,8 @@ public class NodeManager extends CompositeService
|
|||
public void run() {
|
||||
LOG.info("Notifying ContainerManager to block new container-requests");
|
||||
containerManager.setBlockNewContainerRequests(true);
|
||||
containerManager.cleanUpApplications(NodeManagerEventType.RESYNC);
|
||||
LOG.info("Cleaning up running containers on resync");
|
||||
containerManager.cleanupContainersOnNMResync();
|
||||
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
|
||||
}
|
||||
}.start();
|
||||
|
|
|
@ -501,7 +501,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
.getContainersToCleanup();
|
||||
if (!containersToCleanup.isEmpty()) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new CMgrCompletedContainersEvent(containersToCleanup));
|
||||
new CMgrCompletedContainersEvent(containersToCleanup,
|
||||
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
|
||||
}
|
||||
List<ApplicationId> appsToCleanup =
|
||||
response.getApplicationsToCleanup();
|
||||
|
|
|
@ -87,7 +87,6 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
|
@ -306,7 +305,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
try {
|
||||
serviceStopped = true;
|
||||
if (context != null) {
|
||||
cleanUpApplications(NodeManagerEventType.SHUTDOWN);
|
||||
cleanUpApplicationsOnNMShutDown();
|
||||
}
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
|
@ -320,7 +319,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
super.serviceStop();
|
||||
}
|
||||
|
||||
public void cleanUpApplications(NodeManagerEventType eventType) {
|
||||
public void cleanUpApplicationsOnNMShutDown() {
|
||||
Map<ApplicationId, Application> applications =
|
||||
this.context.getApplications();
|
||||
if (applications.isEmpty()) {
|
||||
|
@ -336,34 +335,16 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
LOG.info("Waiting for Applications to be Finished");
|
||||
|
||||
switch (eventType) {
|
||||
case SHUTDOWN:
|
||||
long waitStartTime = System.currentTimeMillis();
|
||||
while (!applications.isEmpty()
|
||||
&& System.currentTimeMillis() - waitStartTime
|
||||
< waitForContainersOnShutdownMillis) {
|
||||
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.warn("Interrupted while sleeping on applications finish on shutdown",
|
||||
ex);
|
||||
LOG.warn(
|
||||
"Interrupted while sleeping on applications finish on shutdown", ex);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case RESYNC:
|
||||
while (!applications.isEmpty()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.warn("Interrupted while sleeping on applications finish on resync",
|
||||
ex);
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new YarnRuntimeException("Get an unknown NodeManagerEventType: "
|
||||
+ eventType);
|
||||
}
|
||||
|
||||
// All applications Finished
|
||||
if (applications.isEmpty()) {
|
||||
|
@ -374,6 +355,40 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
public void cleanupContainersOnNMResync() {
|
||||
Map<ContainerId, Container> containers = context.getContainers();
|
||||
if (containers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
LOG.info("Containers still running on "
|
||||
+ CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC + " : "
|
||||
+ containers.keySet());
|
||||
|
||||
List<ContainerId> containerIds =
|
||||
new ArrayList<ContainerId>(containers.keySet());
|
||||
|
||||
LOG.info("Waiting for containers to be killed");
|
||||
|
||||
this.handle(new CMgrCompletedContainersEvent(containerIds,
|
||||
CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
|
||||
while (!containers.isEmpty()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.warn("Interrupted while sleeping on container kill on resync", ex);
|
||||
}
|
||||
}
|
||||
|
||||
// All containers killed
|
||||
if (containers.isEmpty()) {
|
||||
LOG.info("All containers in DONE state");
|
||||
} else {
|
||||
LOG.info("Done waiting for containers to be killed. Still alive: " +
|
||||
containers.keySet());
|
||||
}
|
||||
}
|
||||
|
||||
// Get the remoteUGI corresponding to the api call.
|
||||
protected UserGroupInformation getRemoteUgi()
|
||||
throws YarnException {
|
||||
|
@ -850,7 +865,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
break;
|
||||
default:
|
||||
throw new YarnRuntimeException(
|
||||
"Get an unknown ContainerManagerEvent type: " + event.getType());
|
||||
"Got an unknown ContainerManagerEvent type: " + event.getType());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -102,7 +102,11 @@ public class TestNodeManagerResync {
|
|||
} catch (BrokenBarrierException e) {
|
||||
}
|
||||
Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
|
||||
|
||||
// Only containers should be killed on resync, apps should lie around. That
|
||||
// way local resources for apps can be used beyond resync without
|
||||
// relocalization
|
||||
Assert.assertTrue(nm.getNMContext().getApplications()
|
||||
.containsKey(cId.getApplicationAttemptId().getApplicationId()));
|
||||
Assert.assertFalse(assertionFailedInThread.get());
|
||||
|
||||
nm.stop();
|
||||
|
@ -285,7 +289,6 @@ public class TestNodeManagerResync {
|
|||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
try {
|
||||
while (!isStopped && numContainers < 10) {
|
||||
ContainerId cId = TestNodeManagerShutdown.createContainerId();
|
||||
StartContainerRequest scRequest =
|
||||
StartContainerRequest.newInstance(containerLaunchContext,
|
||||
null);
|
||||
|
|
|
@ -93,6 +93,7 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class TestNodeStatusUpdater {
|
||||
|
||||
// temp fix until metrics system can auto-detect itself running in unit test:
|
||||
|
@ -352,7 +353,6 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
|
||||
|
||||
private Context context;
|
||||
private final long rmStartIntervalMS;
|
||||
private final boolean rmNeverStart;
|
||||
public ResourceTracker resourceTracker;
|
||||
|
@ -360,7 +360,6 @@ public class TestNodeStatusUpdater {
|
|||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||
long rmStartIntervalMS, boolean rmNeverStart) {
|
||||
super(context, dispatcher, healthChecker, metrics);
|
||||
this.context = context;
|
||||
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||
this.rmNeverStart = rmNeverStart;
|
||||
}
|
||||
|
@ -376,8 +375,8 @@ public class TestNodeStatusUpdater {
|
|||
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
|
||||
resourceTracker =
|
||||
(ResourceTracker) RetryProxy.create(ResourceTracker.class,
|
||||
new MyResourceTracker6(this.context, rmStartIntervalMS,
|
||||
rmNeverStart), retryPolicy);
|
||||
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
|
||||
retryPolicy);
|
||||
return resourceTracker;
|
||||
}
|
||||
|
||||
|
@ -685,14 +684,11 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
private class MyResourceTracker6 implements ResourceTracker {
|
||||
|
||||
private final Context context;
|
||||
private long rmStartIntervalMS;
|
||||
private boolean rmNeverStart;
|
||||
private final long waitStartTime;
|
||||
|
||||
public MyResourceTracker6(Context context, long rmStartIntervalMS,
|
||||
boolean rmNeverStart) {
|
||||
this.context = context;
|
||||
public MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart) {
|
||||
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||
this.rmNeverStart = rmNeverStart;
|
||||
this.waitStartTime = System.currentTimeMillis();
|
||||
|
@ -868,8 +864,8 @@ public class TestNodeStatusUpdater {
|
|||
metrics, aclsManager, dirsHandler) {
|
||||
|
||||
@Override
|
||||
public void cleanUpApplications(NodeManagerEventType eventType) {
|
||||
super.cleanUpApplications(NodeManagerEventType.SHUTDOWN);
|
||||
public void cleanUpApplicationsOnNMShutDown() {
|
||||
super.cleanUpApplicationsOnNMShutDown();
|
||||
numCleanups.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
@ -1222,8 +1218,8 @@ public class TestNodeStatusUpdater {
|
|||
metrics, aclsManager, dirsHandler) {
|
||||
|
||||
@Override
|
||||
public void cleanUpApplications(NodeManagerEventType eventType) {
|
||||
super.cleanUpApplications(NodeManagerEventType.SHUTDOWN);
|
||||
public void cleanUpApplicationsOnNMShutDown() {
|
||||
super.cleanUpApplicationsOnNMShutDown();
|
||||
numCleanups.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue