YARN-3194. RM should handle NMContainerStatuses sent by NM while registering if NM is Reconnected node. Contributed by Rohith

(cherry picked from commit a64dd3d24b)
This commit is contained in:
Jason Lowe 2015-10-08 16:33:34 +00:00
parent 1484ebb602
commit 528b809d2d
6 changed files with 209 additions and 48 deletions

View File

@ -39,6 +39,9 @@ Release 2.6.2 - UNRELEASED
YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
after NM is reconnected. (zhihai xu via xgong)
YARN-3194. RM should handle NMContainerStatuses sent by NM while
registering if NM is Reconnected node (Rohith via jlowe)
Release 2.6.1 - 2015-09-23
INCOMPATIBLE CHANGES

View File

@ -312,9 +312,12 @@ public class ResourceTrackerService extends AbstractService implements
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeReconnectEvent(nodeId, rmNode,
request.getRunningApplications()));
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMNodeReconnectEvent(nodeId, rmNode, request
.getRunningApplications(), request.getNMContainerStatuses()));
}
// On every node manager register we will be clearing NMToken keys if
// present for any running application.

View File

@ -602,6 +602,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
isCapabilityChanged = true;
}
handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
@ -621,6 +623,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
}
private void handleNMContainerStatus(
List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
List<ContainerStatus> containerStatuses =
new ArrayList<ContainerStatus>();
for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
containerStatuses.add(createContainerStatus(nmContainerStatus));
}
rmnode.handleContainerStatus(containerStatuses);
}
private ContainerStatus createContainerStatus(
NMContainerStatus remoteContainer) {
ContainerStatus cStatus =
ContainerStatus.newInstance(remoteContainer.getContainerId(),
remoteContainer.getContainerState(),
remoteContainer.getDiagnostics(),
remoteContainer.getContainerExitStatus());
return cStatus;
}
}
public static class UpdateNodeResourceWhenRunningTransition
@ -746,49 +768,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
return NodeState.UNHEALTHY;
}
// Filter the map to only obtain just launched containers and finished
// containers.
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
ContainerId containerId = remoteContainer.getContainerId();
rmNode.handleContainerStatus(statusEvent.getContainers());
// Don't bother with containers already scheduled for cleanup, or for
// applications already killed. The scheduler doens't need to know any
// more about this container
if (rmNode.containersToClean.contains(containerId)) {
LOG.info("Container " + containerId + " already scheduled for " +
"cleanup, no further processing");
continue;
}
if (rmNode.finishedApplications.contains(containerId
.getApplicationAttemptId().getApplicationId())) {
LOG.info("Container " + containerId
+ " belongs to an application that is already killed,"
+ " no further processing");
continue;
}
// Process running containers
if (remoteContainer.getState() == ContainerState.RUNNING) {
if (!rmNode.launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time.
rmNode.launchedContainers.add(containerId);
newlyLaunchedContainers.add(remoteContainer);
}
} else {
// A finished container
rmNode.launchedContainers.remove(containerId);
completedContainers.add(remoteContainer);
}
}
if(newlyLaunchedContainers.size() != 0
|| completedContainers.size() != 0) {
rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
(newlyLaunchedContainers, completedContainers));
}
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
rmNode.context.getDispatcher().getEventHandler().handle(
@ -872,4 +853,50 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
return nlm.getLabelsOnNode(nodeId);
}
private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
// Filter the map to only obtain just launched containers and finished
// containers.
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
for (ContainerStatus remoteContainer : containerStatuses) {
ContainerId containerId = remoteContainer.getContainerId();
// Don't bother with containers already scheduled for cleanup, or for
// applications already killed. The scheduler doens't need to know any
// more about this container
if (containersToClean.contains(containerId)) {
LOG.info("Container " + containerId + " already scheduled for "
+ "cleanup, no further processing");
continue;
}
if (finishedApplications.contains(containerId.getApplicationAttemptId()
.getApplicationId())) {
LOG.info("Container " + containerId
+ " belongs to an application that is already killed,"
+ " no further processing");
continue;
}
// Process running containers
if (remoteContainer.getState() == ContainerState.RUNNING) {
if (!launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time.
launchedContainers.add(containerId);
newlyLaunchedContainers.add(remoteContainer);
}
} else {
// A finished container
launchedContainers.remove(containerId);
completedContainers.add(remoteContainer);
}
}
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
completedContainers));
}
}
}

View File

@ -22,16 +22,19 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
public class RMNodeReconnectEvent extends RMNodeEvent {
private RMNode reconnectedNode;
private List<ApplicationId> runningApplications;
private List<NMContainerStatus> containerStatuses;
public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
List<ApplicationId> runningApps) {
List<ApplicationId> runningApps, List<NMContainerStatus> containerReports) {
super(nodeId, RMNodeEventType.RECONNECTED);
reconnectedNode = newNode;
runningApplications = runningApps;
containerStatuses = containerReports;
}
public RMNode getReconnectedNode() {
@ -41,4 +44,8 @@ public class RMNodeReconnectEvent extends RMNodeEvent {
public List<ApplicationId> getRunningApplications() {
return runningApplications;
}
public List<NMContainerStatus> getNMContainerStatuses() {
return containerStatuses;
}
}

View File

@ -28,7 +28,9 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.log4j.Level;
@ -478,6 +481,124 @@ public class TestApplicationCleanup {
rm1.stop();
}
// The test verifies processing of NMContainerStatuses which are sent during
// NM registration.
// 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
// 2. AM sends ResourceRequest for 1 container with memory 2048MB.
// 3. Verify for number of container allocated by RM
// 4. Verify Memory Usage by cluster, it should be 3072. AM memory + requested
// memory. 1024 + 2048=3072
// 5. Re-register NM by sending completed container status
// 6. Verify for Memory Used, it should be 1024
// 7. Send AM heatbeat to RM. Allocated response should contain completed
// container.
@Test(timeout = 60000)
public void testProcessingNMContainerStatusesOnNMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
int nmMemory = 8192;
int amMemory = 1024;
int containerMemory = 2048;
MockNM nm1 =
new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app0 = rm1.submitApp(amMemory);
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
// 2. AM sends ResourceRequest for 1 container with memory 2048MB.
int noOfContainers = 1;
List<Container> allocateContainers =
am0.allocateAndWaitForContainers(noOfContainers, containerMemory, nm1);
// 3. Verify for number of container allocated by RM
Assert.assertEquals(noOfContainers, allocateContainers.size());
Container container = allocateContainers.get(0);
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), container.getId()
.getContainerId(), ContainerState.RUNNING);
rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
// 4. Verify Memory Usage by cluster, it should be 3072. AM memory +
// requested memory. 1024 + 2048=3072
ResourceScheduler rs = rm1.getRMContext().getScheduler();
int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
Assert.assertEquals(amMemory + containerMemory, allocatedMB);
// 5. Re-register NM by sending completed container status
List<NMContainerStatus> nMContainerStatusForApp =
createNMContainerStatusForApp(am0);
nm1.registerNode(nMContainerStatusForApp,
Arrays.asList(app0.getApplicationId()));
waitForClusterMemory(nm1, rs, amMemory);
// 6. Verify for Memory Used, it should be 1024
Assert.assertEquals(amMemory, rs.getRootQueueMetrics().getAllocatedMB());
// 7. Send AM heatbeat to RM. Allocated response should contain completed
// container
AllocateRequest req =
AllocateRequest.newInstance(0, 0F, new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>(), null);
AllocateResponse allocate = am0.allocate(req);
List<ContainerStatus> completedContainersStatuses =
allocate.getCompletedContainersStatuses();
Assert.assertEquals(noOfContainers, completedContainersStatuses.size());
// Application clean up should happen Cluster memory used is 0
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
waitForClusterMemory(nm1, rs, 0);
rm1.stop();
}
private void waitForClusterMemory(MockNM nm1, ResourceScheduler rs,
int clusterMemory) throws Exception, InterruptedException {
int counter = 0;
while (rs.getRootQueueMetrics().getAllocatedMB() != clusterMemory) {
nm1.nodeHeartbeat(true);
Thread.sleep(100);
if (counter++ == 50) {
Assert.fail("Wait for cluster memory is timed out.Expected="
+ clusterMemory + " Actual="
+ rs.getRootQueueMetrics().getAllocatedMB());
}
}
}
public static List<NMContainerStatus> createNMContainerStatusForApp(MockAM am) {
List<NMContainerStatus> list = new ArrayList<NMContainerStatus>();
NMContainerStatus amContainer =
createNMContainerStatus(am.getApplicationAttemptId(), 1,
ContainerState.RUNNING, 1024);
NMContainerStatus completedContainer =
createNMContainerStatus(am.getApplicationAttemptId(), 2,
ContainerState.COMPLETE, 2048);
list.add(amContainer);
list.add(completedContainer);
return list;
}
public static NMContainerStatus createNMContainerStatus(
ApplicationAttemptId appAttemptId, int id, ContainerState containerState,
int memory) {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
NMContainerStatus containerReport =
NMContainerStatus.newInstance(containerId, containerState,
Resource.newInstance(memory, 1), "recover container", 0,
Priority.newInstance(0), 0);
return containerReport;
}
public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup();
t.testAppCleanup();

View File

@ -540,7 +540,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
@ -614,7 +614,7 @@ public class TestRMNodeTransitions {
Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
null));
null, null));
Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
}
}