YARN-3586. RM to only get back addresses of Collectors that NM needs to know.

(Junping Du via Varun Saxena).
This commit is contained in:
Varun Saxena 2015-12-22 20:58:54 +05:30 committed by Sangjin Lee
parent eb0ac8efb1
commit 829cceebc0
2 changed files with 97 additions and 15 deletions

View File

@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@ -544,16 +543,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
List<ApplicationId> keepAliveApps =
remoteNodeStatus.getKeepAliveApplications();
if (timelineV2Enabled && keepAliveApps != null) {
if (timelineV2Enabled) {
// Return collectors' map that NM needs to know
// TODO we should optimize this to only include collector info that NM
// doesn't know yet.
setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
setAppCollectorsMapToResponse(rmNode.getRunningApps(),
nodeHeartBeatResponse);
}
// 4. Send status to RMNode, saving the latest response.
List<ApplicationId> keepAliveApps =
remoteNodeStatus.getKeepAliveApplications();
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
if (request.getLogAggregationReportsForApps() != null
@ -597,18 +595,20 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
private void setAppCollectorsMapToResponse(
List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
Map<ApplicationId, String> liveAppCollectorsMap = new
ConcurrentHashMap<ApplicationId, String>();
HashMap<ApplicationId, String>();
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
// Set collectors for all apps now.
// TODO set collectors for only active apps running on NM (liveApps cannot be
// used for this case)
for (Map.Entry<ApplicationId, RMApp> rmApp : rmApps.entrySet()) {
ApplicationId appId = rmApp.getKey();
String appCollectorAddr = rmApp.getValue().getCollectorAddr();
// Set collectors for all running apps on this node.
for (ApplicationId appId : runningApps) {
String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
if (appCollectorAddr != null) {
liveAppCollectorsMap.put(appId, appCollectorAddr);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Collector for applicaton: " + appId +
" hasn't registered yet!");
}
}
}
response.setAppCollectorsMap(liveAppCollectorsMap);

View File

@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -61,6 +62,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
@ -68,8 +70,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
@ -864,6 +869,83 @@ public void testReboot() throws Exception {
checkRebootedNMCount(rm, ++initialMetricCount);
}
@Test
public void testNodeHeartbeatForAppCollectorsMap() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// set version to 2
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
// enable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector");
conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
+ "timeline_collector" + ".class",
PerNodeTimelineCollectorsAuxService.class.getName());
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:1234", 2048);
NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
RMNodeImpl node1 =
(RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNodeImpl node2 =
(RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId());
RMApp app1 = rm.submitApp(1024);
String collectorAddr1 = "1.2.3.4:5";
app1.setCollectorAddr(collectorAddr1);
String collectorAddr2 = "5.4.3.2:1";
RMApp app2 = rm.submitApp(1024);
app2.setCollectorAddr(collectorAddr2);
// Create a running container for app1 running on nm1
ContainerId runningContainerId1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
app1.getApplicationId(), 0), 0);
ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1,
ContainerState.RUNNING, "", 0);
List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
statusList.add(status1);
NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
"", System.currentTimeMillis());
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth,
statusList, null, nodeHeartbeat1));
Assert.assertEquals(1, node1.getRunningApps().size());
Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0));
// Create a running container for app2 running on nm2
ContainerId runningContainerId2 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
app2.getApplicationId(), 0), 0);
ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2,
ContainerState.RUNNING, "", 0);
statusList = new ArrayList<ContainerStatus>();
statusList.add(status2);
node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeHealth,
statusList, null, nodeHeartbeat2));
Assert.assertEquals(1, node2.getRunningApps().size());
Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
nodeHeartbeat1 = nm1.nodeHeartbeat(true);
Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap();
Assert.assertEquals(1, map1.size());
Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId()));
nodeHeartbeat2 = nm2.nodeHeartbeat(true);
Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap();
Assert.assertEquals(1, map2.size());
Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId()));
}
private void checkRebootedNMCount(MockRM rm2, int count)
throws InterruptedException {