YARN-309. Changed NodeManager to obtain heart-beat interval from the ResourceManager. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1463346 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1463347 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-02 01:05:19 +00:00
parent f47bdbded7
commit 9baac1676a
12 changed files with 154 additions and 40 deletions

View File

@ -51,6 +51,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-475. Remove a unused constant in the public API - YARN-475. Remove a unused constant in the public API -
ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. (Hitesh Shah via vinodkv) ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. (Hitesh Shah via vinodkv)
YARN-309. Changed NodeManager to obtain heart-beat interval from the
ResourceManager. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -219,6 +219,11 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_SCHEDULER = public static final String DEFAULT_RM_SCHEDULER =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
/** RM set next Heartbeat interval for NM */
public static final String RM_NM_HEARTBEAT_INTERVAL_MS =
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
//Delegation token related keys //Delegation token related keys
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval"; RM_PREFIX + "delegation.key.update-interval";
@ -329,12 +334,6 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "delete.thread-count"; NM_PREFIX + "delete.thread-count";
public static final int DEFAULT_NM_DELETE_THREAD_COUNT = 4; public static final int DEFAULT_NM_DELETE_THREAD_COUNT = 4;
// TODO: Should this instead be dictated by RM?
/** Heartbeat interval to RM*/
public static final String NM_TO_RM_HEARTBEAT_INTERVAL_MS =
NM_PREFIX + "heartbeat.interval-ms";
public static final int DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
/** Keytab for NM.*/ /** Keytab for NM.*/
public static final String NM_KEYTAB = NM_PREFIX + "keytab"; public static final String NM_KEYTAB = NM_PREFIX + "keytab";

View File

@ -278,6 +278,12 @@
<value>86400</value> <value>86400</value>
</property> </property>
<property>
<description>The heart-beat interval in milliseconds for every NodeManager in the cluster.</description>
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-ms</name>
<value>1000</value>
</property>
<!-- Node Manager Configs --> <!-- Node Manager Configs -->
<property> <property>
<description>The address of the container manager in the NM.</description> <description>The address of the container manager in the NM.</description>
@ -336,12 +342,6 @@
<value>0</value> <value>0</value>
</property> </property>
<property>
<description>Heartbeat interval to RM</description>
<name>yarn.nodemanager.heartbeat.interval-ms</name>
<value>1000</value>
</property>
<property> <property>
<description>Keytab for NM.</description> <description>Keytab for NM.</description>
<name>yarn.nodemanager.keytab</name> <name>yarn.nodemanager.keytab</name>

View File

@ -42,4 +42,7 @@ public interface NodeHeartbeatResponse {
void addAllContainersToCleanup(List<ContainerId> containers); void addAllContainersToCleanup(List<ContainerId> containers);
void addAllApplicationsToCleanup(List<ApplicationId> applications); void addAllApplicationsToCleanup(List<ApplicationId> applications);
long getNextHeartBeatInterval();
void setNextHeartBeatInterval(long nextHeartBeatInterval);
} }

View File

@ -271,6 +271,18 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
builder.addAllApplicationsToCleanup(iterable); builder.addAllApplicationsToCleanup(iterable);
} }
@Override
public long getNextHeartBeatInterval() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getNextHeartBeatInterval());
}
@Override
public void setNextHeartBeatInterval(long nextHeartBeatInterval) {
maybeInitBuilder();
builder.setNextHeartBeatInterval(nextHeartBeatInterval);
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p); return new ContainerIdPBImpl(p);
} }

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.utils;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
/**
* Server Builder utilities to construct various objects.
*
*/
public class YarnServerBuilderUtils {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
NodeAction action, List<ContainerId> containersToCleanUp,
List<ApplicationId> applicationsToCleanUp,
MasterKey masterKey, long nextHeartbeatInterval) {
NodeHeartbeatResponse response = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
response.setResponseId(responseId);
response.setNodeAction(action);
response.setMasterKey(masterKey);
response.setNextHeartBeatInterval(nextHeartbeatInterval);
if(containersToCleanUp != null) {
response.addAllContainersToCleanup(containersToCleanUp);
}
if(applicationsToCleanUp != null) {
response.addAllApplicationsToCleanup(applicationsToCleanUp);
}
return response;
}
}

View File

@ -47,4 +47,5 @@ message NodeHeartbeatResponseProto {
optional NodeActionProto nodeAction = 3; optional NodeActionProto nodeAction = 3;
repeated ContainerIdProto containers_to_cleanup = 4; repeated ContainerIdProto containers_to_cleanup = 4;
repeated ApplicationIdProto applications_to_cleanup = 5; repeated ApplicationIdProto applications_to_cleanup = 5;
optional int64 nextHeartBeatInterval = 6;
} }

View File

@ -71,7 +71,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private NodeId nodeId; private NodeId nodeId;
private long heartBeatInterval; private long nextHeartBeatInterval;
private ResourceTracker resourceTracker; private ResourceTracker resourceTracker;
private InetSocketAddress rmAddress; private InetSocketAddress rmAddress;
private Resource totalResource; private Resource totalResource;
@ -103,9 +103,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
this.heartBeatInterval =
conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
int memoryMb = int memoryMb =
conf.getInt( conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB); YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
@ -394,9 +392,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
while (!isStopped) { while (!isStopped) {
// Send heartbeat // Send heartbeat
try { try {
synchronized (heartbeatMonitor) {
heartbeatMonitor.wait(heartBeatInterval);
}
NodeStatus nodeStatus = getNodeStatus(); NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID); nodeStatus.setResponseId(lastHeartBeatID);
@ -409,7 +404,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
NodeHeartbeatResponse response = NodeHeartbeatResponse response =
resourceTracker.nodeHeartbeat(request); resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
// See if the master-key has rolled over // See if the master-key has rolled over
if (isSecurityEnabled()) { if (isSecurityEnabled()) {
MasterKey updatedMasterKey = response.getMasterKey(); MasterKey updatedMasterKey = response.getMasterKey();
@ -456,6 +452,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// TODO Better error handling. Thread can die with the rest of the // TODO Better error handling. Thread can die with the rest of the
// NM still running. // NM still running.
LOG.error("Caught exception in status-updater", e); LOG.error("Caught exception in status-updater", e);
} finally {
synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
nextHeartBeatInterval;
try {
heartbeatMonitor.wait(nextHeartBeatInterval);
} catch (InterruptedException e) {
// Do Nothing
}
}
} }
} }
} }

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
/** /**
* This class allows a node manager to run without without communicating with a * This class allows a node manager to run without without communicating with a
@ -73,9 +74,9 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
LOG.info("Got heartbeat number " + heartBeatID); LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
NodeHeartbeatResponse nhResponse = recordFactory NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
.newRecordInstance(NodeHeartbeatResponse.class); .newNodeHeartbeatResponse(heartBeatID, null, null,
nhResponse.setResponseId(heartBeatID); null, null, 1000L);
return nhResponse; return nhResponse;
} }
} }

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
@ -214,9 +215,8 @@ public class TestNodeStatusUpdater {
Assert.assertEquals(2, activeContainers.size()); Assert.assertEquals(2, activeContainers.size());
} }
NodeHeartbeatResponse nhResponse = recordFactory NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
.newRecordInstance(NodeHeartbeatResponse.class); newNodeHeartbeatResponse(heartBeatID, null, null, null, null, 1000L);
nhResponse.setResponseId(heartBeatID);
return nhResponse; return nhResponse;
} }
} }
@ -325,10 +325,9 @@ public class TestNodeStatusUpdater {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
NodeHeartbeatResponse nhResponse = recordFactory NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
.newRecordInstance(NodeHeartbeatResponse.class); newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
nhResponse.setResponseId(heartBeatID); null, null, 1000L);
nhResponse.setNodeAction(heartBeatNodeAction);
return nhResponse; return nhResponse;
} }
} }
@ -361,10 +360,9 @@ public class TestNodeStatusUpdater {
LOG.info("Got heartBeatId: [" + heartBeatID +"]"); LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
NodeHeartbeatResponse nhResponse = NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
recordFactory.newRecordInstance(NodeHeartbeatResponse.class); newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
nhResponse.setResponseId(heartBeatID); null, null, 1000L);
nhResponse.setNodeAction(heartBeatNodeAction);
if (nodeStatus.getKeepAliveApplications() != null if (nodeStatus.getKeepAliveApplications() != null
&& nodeStatus.getKeepAliveApplications().size() > 0) { && nodeStatus.getKeepAliveApplications().size() > 0) {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
@ -67,6 +69,7 @@ public class ResourceTrackerService extends AbstractService implements
private final NMLivelinessMonitor nmLivelinessMonitor; private final NMLivelinessMonitor nmLivelinessMonitor;
private final RMContainerTokenSecretManager containerTokenSecretManager; private final RMContainerTokenSecretManager containerTokenSecretManager;
private long nextHeartBeatInterval;
private Server server; private Server server;
private InetSocketAddress resourceTrackerAddress; private InetSocketAddress resourceTrackerAddress;
@ -100,6 +103,14 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf); RackResolver.init(conf);
nextHeartBeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (nextHeartBeatInterval <= 0) {
throw new YarnException("Invalid Configuration. "
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
+ " should be larger than 0.");
}
super.init(conf); super.init(conf);
} }
@ -224,9 +235,6 @@ public class ResourceTrackerService extends AbstractService implements
return shutDown; return shutDown;
} }
NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
@ -246,10 +254,11 @@ public class ResourceTrackerService extends AbstractService implements
} }
// Heartbeat response // Heartbeat response
nodeHeartBeatResponse.setResponseId(lastNodeHeartbeatResponse.getResponseId() + 1); NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
getResponseId() + 1, NodeAction.NORMAL, null, null, null,
nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
nodeHeartBeatResponse.setNodeAction(NodeAction.NORMAL);
// Check if node's masterKey needs to be updated and if the currentKey has // Check if node's masterKey needs to be updated and if the currentKey has
// roller over, send it across // roller over, send it across
if (isSecurityEnabled()) { if (isSecurityEnabled()) {

View File

@ -53,6 +53,29 @@ public class TestResourceTrackerService {
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
private MockRM rm; private MockRM rm;
/**
* Test RM read NM next heartBeat Interval correctly from Configuration file,
* and NM get next heartBeat Interval from RM correctly
*/
@Test (timeout = 5000)
public void testGetNextHeartBeatInterval() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "4000");
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(4000, nodeHeartbeat.getNextHeartBeatInterval());
NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
Assert.assertEquals(4000, nodeHeartbeat2.getNextHeartBeatInterval());
}
/** /**
* Decommissioning using a pre-configured include hosts file * Decommissioning using a pre-configured include hosts file
*/ */