merge YARN-376 from trunk. Fixes a bug which would prevent the NM knowing about completed containers and applications. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1451474 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6072cb98a9
commit
2b7d8ed210
|
@ -33,6 +33,9 @@ Release 2.0.4-beta - UNRELEASED
|
||||||
YARN-406. Fix TestRackResolver to function in networks where "host1"
|
YARN-406. Fix TestRackResolver to function in networks where "host1"
|
||||||
resolves to a valid host. (Hitesh Shah via sseth)
|
resolves to a valid host. (Hitesh Shah via sseth)
|
||||||
|
|
||||||
|
YARN-376. Fixes a bug which would prevent the NM knowing about completed
|
||||||
|
containers and applications. (Jason Lowe via sseth)
|
||||||
|
|
||||||
Release 2.0.3-alpha - 2013-02-06
|
Release 2.0.3-alpha - 2013-02-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -262,8 +262,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
HeartbeatResponse latestResponse = recordFactory
|
HeartbeatResponse latestResponse = recordFactory
|
||||||
.newRecordInstance(HeartbeatResponse.class);
|
.newRecordInstance(HeartbeatResponse.class);
|
||||||
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
|
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
|
||||||
latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp());
|
rmNode.updateHeartbeatResponseForCleanup(latestResponse);
|
||||||
latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
|
|
||||||
latestResponse.setNodeAction(NodeAction.NORMAL);
|
latestResponse.setNodeAction(NodeAction.NORMAL);
|
||||||
|
|
||||||
// Check if node's masterKey needs to be updated and if the currentKey has
|
// Check if node's masterKey needs to be updated and if the currentKey has
|
||||||
|
|
|
@ -105,6 +105,13 @@ public interface RMNode {
|
||||||
|
|
||||||
public List<ApplicationId> getAppsToCleanup();
|
public List<ApplicationId> getAppsToCleanup();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update a {@link HeartbeatResponse} with the list of containers and
|
||||||
|
* applications to clean up for this node.
|
||||||
|
* @param response the {@link HeartbeatResponse} to update
|
||||||
|
*/
|
||||||
|
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response);
|
||||||
|
|
||||||
public HeartbeatResponse getLastHeartBeatResponse();
|
public HeartbeatResponse getLastHeartBeatResponse();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -303,6 +303,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
|
||||||
|
this.writeLock.lock();
|
||||||
|
|
||||||
|
try {
|
||||||
|
response.addAllContainersToCleanup(
|
||||||
|
new ArrayList<ContainerId>(this.containersToClean));
|
||||||
|
response.addAllApplicationsToCleanup(this.finishedApplications);
|
||||||
|
this.containersToClean.clear();
|
||||||
|
this.finishedApplications.clear();
|
||||||
|
} finally {
|
||||||
|
this.writeLock.unlock();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||||
|
|
||||||
|
@ -564,12 +579,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
|
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
|
||||||
statusEvent.getKeepAliveAppIds());
|
statusEvent.getKeepAliveAppIds());
|
||||||
|
|
||||||
// HeartBeat processing from our end is done, as node pulls the following
|
|
||||||
// lists before sending status-updates. Clear data-structures
|
|
||||||
// TODO: These lists could go to the NM multiple times, or never.
|
|
||||||
rmNode.containersToClean.clear();
|
|
||||||
rmNode.finishedApplications.clear();
|
|
||||||
|
|
||||||
return NodeState.RUNNING;
|
return NodeState.RUNNING;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,6 +186,10 @@ public class MockNodes {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.List;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
@ -51,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -299,6 +302,39 @@ public class TestRMNodeTransitions {
|
||||||
Assert.assertEquals(NodeState.REBOOTED, node.getState());
|
Assert.assertEquals(NodeState.REBOOTED, node.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=20000)
|
||||||
|
public void testUpdateHeartbeatResponseForCleanup() {
|
||||||
|
RMNodeImpl node = getRunningNode();
|
||||||
|
NodeId nodeId = node.getNodeID();
|
||||||
|
|
||||||
|
// Expire a container
|
||||||
|
ContainerId completedContainerId = BuilderUtils.newContainerId(
|
||||||
|
BuilderUtils.newApplicationAttemptId(
|
||||||
|
BuilderUtils.newApplicationId(0, 0), 0), 0);
|
||||||
|
node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
|
||||||
|
Assert.assertEquals(1, node.getContainersToCleanUp().size());
|
||||||
|
|
||||||
|
// Finish an application
|
||||||
|
ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1);
|
||||||
|
node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
|
||||||
|
Assert.assertEquals(1, node.getAppsToCleanup().size());
|
||||||
|
|
||||||
|
// Verify status update does not clear containers/apps to cleanup
|
||||||
|
// but updating heartbeat response for cleanup does
|
||||||
|
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
|
||||||
|
node.handle(statusEvent);
|
||||||
|
Assert.assertEquals(1, node.getContainersToCleanUp().size());
|
||||||
|
Assert.assertEquals(1, node.getAppsToCleanup().size());
|
||||||
|
HeartbeatResponse hbrsp = Records.newRecord(HeartbeatResponse.class);
|
||||||
|
node.updateHeartbeatResponseForCleanup(hbrsp);
|
||||||
|
Assert.assertEquals(0, node.getContainersToCleanUp().size());
|
||||||
|
Assert.assertEquals(0, node.getAppsToCleanup().size());
|
||||||
|
Assert.assertEquals(1, hbrsp.getContainersToCleanupCount());
|
||||||
|
Assert.assertEquals(completedContainerId, hbrsp.getContainerToCleanup(0));
|
||||||
|
Assert.assertEquals(1, hbrsp.getApplicationsToCleanupCount());
|
||||||
|
Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup(0));
|
||||||
|
}
|
||||||
|
|
||||||
private RMNodeImpl getRunningNode() {
|
private RMNodeImpl getRunningNode() {
|
||||||
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
|
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
|
||||||
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
|
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
|
||||||
|
|
Loading…
Reference in New Issue