YARN-2244. FairScheduler missing handling of containers for unknown application attempts. (Anubhav Dhoot via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1611841 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-07-19 00:20:16 +00:00
parent 5b0492f7c5
commit fcb0fe0548
6 changed files with 98 additions and 79 deletions

View File

@ -53,6 +53,9 @@ Release 2.6.0 - UNRELEASED
after RM recovery but before scheduler learns about apps and app-attempts.
(Jian He via vinodkv)
YARN-2244. FairScheduler missing handling of containers for unknown
application attempts. (Anubhav Dhoot via kasha)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -123,6 +123,23 @@ public abstract class AbstractYarnScheduler
return maximumAllocation;
}
protected void containerLaunchedOnNode(ContainerId containerId,
SchedulerNode node) {
// Get the application for the finished container
SchedulerApplicationAttempt application = getCurrentAttemptForContainer
(containerId);
if (application == null) {
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication<T> app =
applications.get(applicationAttemptId.getApplicationId());

View File

@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -866,21 +865,6 @@ public class CapacityScheduler extends
}
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {

View File

@ -928,22 +928,6 @@ public class FairScheduler extends
}
}
/**
* Process a container which has launched on a node, as reported by the node.
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
/**
* Process a heartbeat update from a node.
*/

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@ -831,23 +830,6 @@ public class FifoScheduler extends
}
}
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
@Lock(FifoScheduler.class)
private synchronized void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {

View File

@ -232,20 +232,7 @@ public class TestApplicationCleanup {
containerStatuses.put(app.getApplicationId(), containerStatusList);
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
dispatcher.await();
List<ContainerId> contsToClean = resp.getContainersToCleanup();
int cleanedConts = contsToClean.size();
waitCount = 0;
while (cleanedConts < 1 && waitCount++ < 200) {
LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
Thread.sleep(100);
resp = nm1.nodeHeartbeat(true);
dispatcher.await();
contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size();
}
LOG.info("Got cleanup for " + contsToClean.get(0));
Assert.assertEquals(1, cleanedConts);
waitForContainerCleanup(dispatcher, nm1, resp);
// Now to test the case when RM already gave cleanup, and NM suddenly
// realizes that the container is running.
@ -258,26 +245,36 @@ public class TestApplicationCleanup {
containerStatuses.put(app.getApplicationId(), containerStatusList);
resp = nm1.nodeHeartbeat(containerStatuses, true);
dispatcher.await();
contsToClean = resp.getContainersToCleanup();
cleanedConts = contsToClean.size();
// The cleanup list won't be instantaneous as it is given out by scheduler
// and not RMNodeImpl.
waitCount = 0;
while (cleanedConts < 1 && waitCount++ < 200) {
LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
Thread.sleep(100);
resp = nm1.nodeHeartbeat(true);
dispatcher.await();
contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size();
}
LOG.info("Got cleanup for " + contsToClean.get(0));
Assert.assertEquals(1, cleanedConts);
waitForContainerCleanup(dispatcher, nm1, resp);
rm.stop();
}
protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
NodeHeartbeatResponse resp) throws Exception {
int waitCount = 0, cleanedConts = 0;
List<ContainerId> contsToClean;
do {
dispatcher.await();
contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size();
if (cleanedConts >= 1) {
break;
}
Thread.sleep(100);
resp = nm.nodeHeartbeat(true);
} while(waitCount++ < 200);
if (contsToClean.isEmpty()) {
LOG.error("Failed to get any containers to cleanup");
} else {
LOG.info("Got cleanup for " + contsToClean.get(0));
}
Assert.assertEquals(1, cleanedConts);
}
private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
throws Exception {
while (true) {
@ -400,6 +397,58 @@ public class TestApplicationCleanup {
rm2.stop();
}
@SuppressWarnings("resource")
@Test (timeout = 60000)
public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// start RM
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(200);
MockAM am0 = launchAM(app0, rm1, nm1);
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
// start new RM
final DrainDispatcher dispatcher2 = new DrainDispatcher();
MockRM rm2 = new MockRM(conf, memStore) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher2;
}
};
rm2.start();
// nm1 register to rm2, and do a heartbeat
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1.registerNode(Arrays.asList(app0.getApplicationId()));
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
// Add unknown container for application unknown to scheduler
NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
.getApplicationAttemptId(), 2, ContainerState.RUNNING);
waitForContainerCleanup(dispatcher2, nm1, response);
rm1.stop();
rm2.stop();
}
public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup();
t.testAppCleanup();