Merge -c 1465731 from trunk to branch-2 for YARN-479. NM retry behavior for connection to RM should be similar for lost heartbeats (Jian He via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1465733 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-04-08 19:19:33 +00:00
parent 45fa8302f4
commit 0aa44dc7ac
3 changed files with 144 additions and 26 deletions

View File

@ -68,6 +68,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-193. Scheduler.normalizeRequest does not account for allocation YARN-193. Scheduler.normalizeRequest does not account for allocation
requests that exceed maximumAllocation limits (Zhijie Shen via bikas) requests that exceed maximumAllocation limits (Zhijie Shen via bikas)
YARN-479. NM retry behavior for connection to RM should be similar for
lost heartbeats (Jian He via bikas)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -87,10 +87,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeHealthCheckerService healthChecker; private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
private long rmConnectWaitMS;
private boolean previousHeartBeatSucceeded; private long rmConnectionRetryIntervalMS;
private List<ContainerStatus> previousContainersStatuses = private boolean waitForEver;
new ArrayList<ContainerStatus>();
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@ -99,7 +98,6 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
this.context = context; this.context = context;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.metrics = metrics; this.metrics = metrics;
this.previousHeartBeatSucceeded = true;
} }
@Override @Override
@ -137,8 +135,8 @@ public synchronized void init(Configuration conf) {
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
LOG.info("Initialized nodemanager for " + nodeId + ":" + LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" physical-cores=" + cpuCores + " virtual-cores=" + virtualCores); " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
super.init(conf); super.init(conf);
} }
@ -192,12 +190,12 @@ protected ResourceTracker getRMClient() {
private void registerWithRM() throws YarnRemoteException { private void registerWithRM() throws YarnRemoteException {
Configuration conf = getConfig(); Configuration conf = getConfig();
long rmConnectWaitMS = rmConnectWaitMS =
conf.getInt( conf.getInt(
YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
* 1000; * 1000;
long rmConnectionRetryIntervalMS = rmConnectionRetryIntervalMS =
conf.getLong( conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
YarnConfiguration YarnConfiguration
@ -210,7 +208,7 @@ private void registerWithRM() throws YarnRemoteException {
" should not be negative."); " should not be negative.");
} }
boolean waitForEver = (rmConnectWaitMS == -1000); waitForEver = (rmConnectWaitMS == -1000);
if(! waitForEver) { if(! waitForEver) {
if(rmConnectWaitMS < 0) { if(rmConnectWaitMS < 0) {
@ -319,14 +317,8 @@ private NodeStatus getNodeStatus() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId); nodeStatus.setNodeId(this.nodeId);
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
if(previousHeartBeatSucceeded) {
previousContainersStatuses.clear();
} else {
containersStatuses.addAll(previousContainersStatuses);
}
int numActiveContainers = 0; int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i = for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) { this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next(); Entry<ContainerId, Container> e = i.next();
@ -341,7 +333,6 @@ private NodeStatus getNodeStatus() {
LOG.info("Sending out status for container: " + containerStatus); LOG.info("Sending out status for container: " + containerStatus);
if (containerStatus.getState() == ContainerState.COMPLETE) { if (containerStatus.getState() == ContainerState.COMPLETE) {
previousContainersStatuses.add(containerStatus);
// Remove // Remove
i.remove(); i.remove();
@ -404,6 +395,9 @@ public void run() {
while (!isStopped) { while (!isStopped) {
// Send heartbeat // Send heartbeat
try { try {
NodeHeartbeatResponse response = null;
int rmRetryCount = 0;
long waitStartTime = System.currentTimeMillis();
NodeStatus nodeStatus = getNodeStatus(); NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID); nodeStatus.setResponseId(lastHeartBeatID);
@ -414,9 +408,31 @@ public void run() {
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey()); .getContainerTokenSecretManager().getCurrentKey());
} }
NodeHeartbeatResponse response = while (!isStopped) {
resourceTracker.nodeHeartbeat(request); try {
previousHeartBeatSucceeded = true; rmRetryCount++;
response = resourceTracker.nodeHeartbeat(request);
break;
} catch (Throwable e) {
LOG.warn("Trying to heartbeat to ResourceManager, "
+ "current no. of failed attempts is " + rmRetryCount);
if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
|| waitForEver) {
try {
LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ " seconds before next heartbeat to RM");
Thread.sleep(rmConnectionRetryIntervalMS);
} catch(InterruptedException ex) {
//done nothing
}
} else {
String errorMessage = "Failed to heartbeat to RM, " +
"no. of failed attempts is "+rmRetryCount;
LOG.error(errorMessage,e);
throw new YarnException(errorMessage,e);
}
}
}
//get next heartbeat interval from response //get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval(); nextHeartBeatInterval = response.getNextHeartBeatInterval();
// See if the master-key has rolled over // See if the master-key has rolled over
@ -432,7 +448,7 @@ public void run() {
if (response.getNodeAction() == NodeAction.SHUTDOWN) { if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
" hence shutting down."); " hence shutting down.");
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break; break;
@ -461,8 +477,12 @@ public void run() {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup)); new CMgrCompletedAppsEvent(appsToCleanup));
} }
} catch (YarnException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
throw e;
} catch (Throwable e) { } catch (Throwable e) {
previousHeartBeatSucceeded = false;
// TODO Better error handling. Thread can die with the rest of the // TODO Better error handling. Thread can die with the rest of the
// NM still running. // NM still running.
LOG.error("Caught exception in status-updater", e); LOG.error("Caught exception in status-updater", e);

View File

@ -31,6 +31,7 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -51,9 +52,11 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -167,6 +170,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID); LOG.info("Got heartbeat number " + heartBeatID);
NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
Dispatcher mockDispatcher = mock(Dispatcher.class);
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers = Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@ -183,7 +190,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
launchContext.setContainerId(firstContainerID); launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2); launchContext.getResource().setMemory(2);
Container container = new ContainerImpl(conf , null, launchContext, null, null); Container container = new ContainerImpl(conf , mockDispatcher,
launchContext, null, mockMetrics);
this.context.getContainers().put(firstContainerID, container); this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) { } else if (heartBeatID == 2) {
// Checks on the RM end // Checks on the RM end
@ -207,7 +215,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
launchContext.setContainerId(secondContainerID); launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3); launchContext.getResource().setMemory(3);
Container container = new ContainerImpl(conf, null, launchContext, null, null); Container container = new ContainerImpl(conf, mockDispatcher,
launchContext, null, mockMetrics);
this.context.getContainers().put(secondContainerID, container); this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) { } else if (heartBeatID == 3) {
// Checks on the RM end // Checks on the RM end
@ -229,13 +238,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker = new MyResourceTracker(this.context); public ResourceTracker resourceTracker;
private Context context; private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics); super(context, dispatcher, healthChecker, metrics);
this.context = context; this.context = context;
resourceTracker = new MyResourceTracker(this.context);
} }
@Override @Override
@ -312,6 +322,21 @@ protected ResourceTracker getRMClient() {
} }
} }
private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
private ResourceTracker resourceTracker;
public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
resourceTracker = new MyResourceTracker5();
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
}
private class MyNodeManager extends NodeManager { private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater; private MyNodeStatusUpdater3 nodeStatusUpdater;
@ -328,6 +353,32 @@ protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
} }
} }
private class MyNodeManager2 extends NodeManager {
public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater;
private CyclicBarrier syncBarrier;
public MyNodeManager2 (CyclicBarrier syncBarrier) {
this.syncBarrier = syncBarrier;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
nodeStatusUpdater =
new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
metrics);
return nodeStatusUpdater;
}
@Override
public void stop() {
super.stop();
isStopped = true;
try {
syncBarrier.await();
} catch (Exception e) {
}
}
}
// //
private class MyResourceTracker2 implements ResourceTracker { private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@ -505,6 +556,26 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
} }
private class MyResourceTracker5 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL;
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction );
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
heartBeatID++;
throw RPCUtil.getRemoteException("NodeHeartbeat exception");
}
}
@Before @Before
public void clearError() { public void clearError() {
nmStartError = null; nmStartError = null;
@ -883,6 +954,30 @@ protected NMContext createNMContext(
nm.stop(); nm.stop();
} }
@Test(timeout = 20000)
public void testNodeStatusUpdaterRetryAndNMShutdown()
throws InterruptedException {
final long connectionWaitSecs = 1;
final long connectionRetryIntervalSecs = 1;
YarnConfiguration conf = createNMConfig();
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
connectionRetryIntervalSecs);
CyclicBarrier syncBarrier = new CyclicBarrier(2);
nm = new MyNodeManager2(syncBarrier);
nm.init(conf);
nm.start();
try {
syncBarrier.await();
} catch (Exception e) {
}
Assert.assertTrue(((MyNodeManager2) nm).isStopped);
Assert.assertTrue("calculate heartBeatCount based on" +
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
}
private class MyNMContext extends NMContext { private class MyNMContext extends NMContext {
ConcurrentMap<ContainerId, Container> containers = ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>(); new ConcurrentSkipListMap<ContainerId, Container>();