YARN-376. Fixes a bug which would prevent the NM knowing about completed containers and applications. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1451473 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-03-01 05:59:54 +00:00
parent 5d6eca08bd
commit 83d8065867
6 changed files with 66 additions and 8 deletions

View File

@ -47,6 +47,9 @@ Release 2.0.4-beta - UNRELEASED
YARN-406. Fix TestRackResolver to function in networks where "host1"
resolves to a valid host. (Hitesh Shah via sseth)
YARN-376. Fixes a bug which would prevent the NM knowing about completed
containers and applications. (Jason Lowe via sseth)
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES

View File

@ -262,8 +262,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
HeartbeatResponse latestResponse = recordFactory
.newRecordInstance(HeartbeatResponse.class);
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp());
latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
rmNode.updateHeartbeatResponseForCleanup(latestResponse);
latestResponse.setNodeAction(NodeAction.NORMAL);
// Check if node's masterKey needs to be updated and if the currentKey has

View File

@ -105,6 +105,13 @@ public interface RMNode {
public List<ApplicationId> getAppsToCleanup();
/**
* Update a {@link HeartbeatResponse} with the list of containers and
* applications to clean up for this node.
* @param response the {@link HeartbeatResponse} to update
*/
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response);
public HeartbeatResponse getLastHeartBeatResponse();
/**

View File

@ -303,6 +303,21 @@ public List<ContainerId> getContainersToCleanUp() {
}
};
@Override
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
this.writeLock.lock();
try {
response.addAllContainersToCleanup(
new ArrayList<ContainerId>(this.containersToClean));
response.addAllApplicationsToCleanup(this.finishedApplications);
this.containersToClean.clear();
this.finishedApplications.clear();
} finally {
this.writeLock.unlock();
}
};
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
@ -564,12 +579,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
// HeartBeat processing from our end is done, as node pulls the following
// lists before sending status-updates. Clear data-structures
// TODO: These lists could go to the NM multiple times, or never.
rmNode.containersToClean.clear();
rmNode.finishedApplications.clear();
return NodeState.RUNNING;
}
}

View File

@ -186,6 +186,10 @@ public List<ApplicationId> getAppsToCleanup() {
return null;
}
@Override
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
}
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
return null;

View File

@ -30,6 +30,7 @@
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@ -38,6 +39,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@ -51,6 +53,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -299,6 +302,39 @@ public void testUnhealthyRebooting() {
Assert.assertEquals(NodeState.REBOOTED, node.getState());
}
@Test(timeout=20000)
public void testUpdateHeartbeatResponseForCleanup() {
RMNodeImpl node = getRunningNode();
NodeId nodeId = node.getNodeID();
// Expire a container
ContainerId completedContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(0, 0), 0), 0);
node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
Assert.assertEquals(1, node.getContainersToCleanUp().size());
// Finish an application
ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1);
node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
Assert.assertEquals(1, node.getAppsToCleanup().size());
// Verify status update does not clear containers/apps to cleanup
// but updating heartbeat response for cleanup does
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
node.handle(statusEvent);
Assert.assertEquals(1, node.getContainersToCleanUp().size());
Assert.assertEquals(1, node.getAppsToCleanup().size());
HeartbeatResponse hbrsp = Records.newRecord(HeartbeatResponse.class);
node.updateHeartbeatResponseForCleanup(hbrsp);
Assert.assertEquals(0, node.getContainersToCleanUp().size());
Assert.assertEquals(0, node.getAppsToCleanup().size());
Assert.assertEquals(1, hbrsp.getContainersToCleanupCount());
Assert.assertEquals(completedContainerId, hbrsp.getContainerToCleanup(0));
Assert.assertEquals(1, hbrsp.getApplicationsToCleanupCount());
Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup(0));
}
private RMNodeImpl getRunningNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,