YARN-101. Fix NodeManager heartbeat processing to not lose track of completed containers in case of dropped heartbeats. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1464105 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1464106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-03 16:57:40 +00:00
parent 523e98d914
commit c582c13c30
4 changed files with 265 additions and 2 deletions

View File

@ -125,6 +125,9 @@ Release 2.0.5-beta - UNRELEASED
local directory hits unix file count limits and thus prevent job failures. local directory hits unix file count limits and thus prevent job failures.
(Omkar Vinit Joshi via vinodkv) (Omkar Vinit Joshi via vinodkv)
YARN-101. Fix NodeManager heartbeat processing to not lose track of completed
containers in case of dropped heartbeats. (Xuan Gong via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -119,6 +119,10 @@ public class NodeManager extends CompositeService
return new DeletionService(exec); return new DeletionService(exec);
} }
protected NMContext createNMContext(NMContainerTokenSecretManager containerTokenSecretManager) {
return new NMContext(containerTokenSecretManager);
}
protected void doSecureLogin() throws IOException { protected void doSecureLogin() throws IOException {
SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB, SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
YarnConfiguration.NM_PRINCIPAL); YarnConfiguration.NM_PRINCIPAL);
@ -137,7 +141,7 @@ public class NodeManager extends CompositeService
containerTokenSecretManager = new NMContainerTokenSecretManager(conf); containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
} }
this.context = new NMContext(containerTokenSecretManager); this.context = createNMContext(containerTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf); this.aclsManager = new ApplicationACLsManager(conf);

View File

@ -88,6 +88,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeHealthCheckerService healthChecker; private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
private boolean previousHeartBeatSucceeded;
private List<ContainerStatus> previousContainersStatuses =
new ArrayList<ContainerStatus>();
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(NodeStatusUpdaterImpl.class.getName()); super(NodeStatusUpdaterImpl.class.getName());
@ -95,6 +99,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.context = context; this.context = context;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.metrics = metrics; this.metrics = metrics;
this.previousHeartBeatSucceeded = true;
} }
@Override @Override
@ -314,8 +319,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId); nodeStatus.setNodeId(this.nodeId);
int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>(); List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
if(previousHeartBeatSucceeded) {
previousContainersStatuses.clear();
} else {
containersStatuses.addAll(previousContainersStatuses);
}
int numActiveContainers = 0;
for (Iterator<Entry<ContainerId, Container>> i = for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) { this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next(); Entry<ContainerId, Container> e = i.next();
@ -330,6 +341,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.info("Sending out status for container: " + containerStatus); LOG.info("Sending out status for container: " + containerStatus);
if (containerStatus.getState() == ContainerState.COMPLETE) { if (containerStatus.getState() == ContainerState.COMPLETE) {
previousContainersStatuses.add(containerStatus);
// Remove // Remove
i.remove(); i.remove();
@ -404,6 +416,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
NodeHeartbeatResponse response = NodeHeartbeatResponse response =
resourceTracker.nodeHeartbeat(request); resourceTracker.nodeHeartbeat(request);
previousHeartBeatSucceeded = true;
//get next heartbeat interval from response //get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval(); nextHeartBeatInterval = response.getNextHeartBeatInterval();
// See if the master-key has rolled over // See if the master-key has rolled over
@ -449,6 +462,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new CMgrCompletedAppsEvent(appsToCleanup)); new CMgrCompletedAppsEvent(appsToCleanup));
} }
} catch (Throwable e) { } catch (Throwable e) {
previousHeartBeatSucceeded = false;
// TODO Better error handling. Thread can die with the rest of the // TODO Better error handling. Thread can die with the rest of the
// NM still running. // NM still running.
LOG.error("Caught exception in status-updater", e); LOG.error("Caught exception in status-updater", e);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -29,6 +30,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -58,11 +61,13 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
@ -92,6 +97,8 @@ public class TestNodeStatusUpdater {
private final Configuration conf = createNMConfig(); private final Configuration conf = createNMConfig();
private NodeManager nm; private NodeManager nm;
protected NodeManager rebootedNodeManager; protected NodeManager rebootedNodeManager;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
@After @After
public void tearDown() { public void tearDown() {
@ -237,6 +244,22 @@ public class TestNodeStatusUpdater {
} }
} }
private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
resourceTracker = new MyResourceTracker4(context);
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
}
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker; public ResourceTracker resourceTracker;
private Context context; private Context context;
@ -384,6 +407,104 @@ public class TestNodeStatusUpdater {
} }
} }
private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL;
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
private Context context;
public MyResourceTracker4(Context context) {
this.context = context;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction);
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
try {
if (heartBeatID == 0) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
.size(), 0);
Assert.assertEquals(context.getContainers().size(), 0);
} else if (heartBeatID == 1) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
.size(), 5);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(0).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(0)
.getContainerId().getId() == 1);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(1).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(1)
.getContainerId().getId() == 2);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(2).getState() == ContainerState.COMPLETE
&& request.getNodeStatus().getContainersStatuses().get(2)
.getContainerId().getId() == 3);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(3).getState() == ContainerState.COMPLETE
&& request.getNodeStatus().getContainersStatuses().get(3)
.getContainerId().getId() == 4);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(4).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(4)
.getContainerId().getId() == 5);
throw new YarnException("Lost the heartbeat response");
} else if (heartBeatID == 2) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
.size(), 7);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(0).getState() == ContainerState.COMPLETE
&& request.getNodeStatus().getContainersStatuses().get(0)
.getContainerId().getId() == 3);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(1).getState() == ContainerState.COMPLETE
&& request.getNodeStatus().getContainersStatuses().get(1)
.getContainerId().getId() == 4);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(2).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(2)
.getContainerId().getId() == 1);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(3).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(3)
.getContainerId().getId() == 2);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(4).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(4)
.getContainerId().getId() == 5);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(5).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(5)
.getContainerId().getId() == 6);
Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
.get(6).getState() == ContainerState.COMPLETE
&& request.getNodeStatus().getContainersStatuses().get(6)
.getContainerId().getId() == 7);
}
} catch (AssertionError error) {
LOG.info(error);
containerStatusBackupSuccessfully = false;
} finally {
heartBeatID++;
}
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, 1000L);
return nhResponse;
}
}
@Before @Before
public void clearError() { public void clearError() {
nmStartError = null; nmStartError = null;
@ -725,6 +846,127 @@ public class TestNodeStatusUpdater {
} }
} }
/**
* Test completed containerStatus get back up when heart beat lost
*/
@Test(timeout = 20000)
public void testCompletedContainerStatusBackup() throws Exception {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MyNodeStatusUpdater2 myNodeStatusUpdater =
new MyNodeStatusUpdater2(context, dispatcher, healthChecker,
metrics);
return myNodeStatusUpdater;
}
@Override
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager) {
return new MyNMContext(containerTokenSecretManager);
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
int waitCount = 0;
while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
if(!containerStatusBackupSuccessfully) {
Assert.fail("ContainerStatus Backup failed");
}
nm.stop();
}
private class MyNMContext extends NMContext {
ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
public MyNMContext(NMContainerTokenSecretManager
containerTokenSecretManager) {
super(containerTokenSecretManager);
}
@Override
public ConcurrentMap<ContainerId, Container> getContainers() {
if (heartBeatID == 0) {
return containers;
} else if (heartBeatID == 1) {
ContainerStatus containerStatus1 =
createContainerStatus(1, ContainerState.RUNNING);
Container container1 = getMockContainer(containerStatus1);
containers.put(containerStatus1.getContainerId(), container1);
ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING);
Container container2 = getMockContainer(containerStatus2);
containers.put(containerStatus2.getContainerId(), container2);
ContainerStatus containerStatus3 =
createContainerStatus(3, ContainerState.COMPLETE);
Container container3 = getMockContainer(containerStatus3);
containers.put(containerStatus3.getContainerId(), container3);
completedContainerStatusList.add(containerStatus3);
ContainerStatus containerStatus4 =
createContainerStatus(4, ContainerState.COMPLETE);
Container container4 = getMockContainer(containerStatus4);
containers.put(containerStatus4.getContainerId(), container4);
completedContainerStatusList.add(containerStatus4);
ContainerStatus containerStatus5 =
createContainerStatus(5, ContainerState.RUNNING);
Container container5 = getMockContainer(containerStatus5);
containers.put(containerStatus5.getContainerId(), container5);
return containers;
} else if (heartBeatID == 2) {
ContainerStatus containerStatus6 =
createContainerStatus(6, ContainerState.RUNNING);
Container container6 = getMockContainer(containerStatus6);
containers.put(containerStatus6.getContainerId(), container6);
ContainerStatus containerStatus7 =
createContainerStatus(7, ContainerState.COMPLETE);
Container container7 = getMockContainer(containerStatus7);
containers.put(containerStatus7.getContainerId(), container7);
completedContainerStatusList.add(containerStatus7);
return containers;
} else {
containers.clear();
return containers;
}
}
private ContainerStatus createContainerStatus(int id,
ContainerState containerState) {
ApplicationId applicationId =
BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
ApplicationAttemptId applicationAttemptId =
BuilderUtils.newApplicationAttemptId(applicationId, id);
ContainerId contaierId =
BuilderUtils.newContainerId(applicationAttemptId, id);
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(contaierId, containerState,
"test_containerStatus: id=" + id + ", containerState: "
+ containerState, 0);
return containerStatus;
}
private Container getMockContainer(ContainerStatus containerStatus) {
Container container = mock(Container.class);
when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
return container;
}
}
private void verifyNodeStartFailure(String errMessage) { private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);