diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 93c276c02f8..cdde2aa11a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -544,16 +543,15 @@ public class ResourceTrackerService extends AbstractService implements nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } - List keepAliveApps = - remoteNodeStatus.getKeepAliveApplications(); - if (timelineV2Enabled && keepAliveApps != null) { + if (timelineV2Enabled) { // Return collectors' map that NM needs to know - // TODO we should optimize this to only include collector info that NM - // doesn't know yet. - setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); + setAppCollectorsMapToResponse(rmNode.getRunningApps(), + nodeHeartBeatResponse); } // 4. Send status to RMNode, saving the latest response. + List keepAliveApps = + remoteNodeStatus.getKeepAliveApplications(); RMNodeStatusEvent nodeStatusEvent = new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); if (request.getLogAggregationReportsForApps() != null @@ -597,18 +595,20 @@ public class ResourceTrackerService extends AbstractService implements } private void setAppCollectorsMapToResponse( - List liveApps, NodeHeartbeatResponse response) { + List runningApps, NodeHeartbeatResponse response) { Map liveAppCollectorsMap = new - ConcurrentHashMap(); + HashMap(); Map rmApps = rmContext.getRMApps(); - // Set collectors for all apps now. - // TODO set collectors for only active apps running on NM (liveApps cannot be - // used for this case) - for (Map.Entry rmApp : rmApps.entrySet()) { - ApplicationId appId = rmApp.getKey(); - String appCollectorAddr = rmApp.getValue().getCollectorAddr(); + // Set collectors for all running apps on this node. + for (ApplicationId appId : runningApps) { + String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); if (appCollectorAddr != null) { liveAppCollectorsMap.put(appId, appCollectorAddr); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Collector for applicaton: " + appId + + " hasn't registered yet!"); + } } } response.setAppCollectorsMap(liveAppCollectorsMap); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 73bef0c7b71..809ef4aaace 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -68,8 +70,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -864,6 +869,83 @@ public class TestResourceTrackerService extends NodeLabelTestBase { checkRebootedNMCount(rm, ++initialMetricCount); } + @Test + public void testNodeHeartbeatForAppCollectorsMap() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // set version to 2 + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + // enable aux-service based timeline collectors + conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector"); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + + "timeline_collector" + ".class", + PerNodeTimelineCollectorsAuxService.class.getName()); + + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:1234", 2048); + + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + + RMNodeImpl node1 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + RMNodeImpl node2 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + RMApp app1 = rm.submitApp(1024); + String collectorAddr1 = "1.2.3.4:5"; + app1.setCollectorAddr(collectorAddr1); + + String collectorAddr2 = "5.4.3.2:1"; + RMApp app2 = rm.submitApp(1024); + app2.setCollectorAddr(collectorAddr2); + + // Create a running container for app1 running on nm1 + ContainerId runningContainerId1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + app1.getApplicationId(), 0), 0); + + ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1, + ContainerState.RUNNING, "", 0); + List statusList = new ArrayList(); + statusList.add(status1); + NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, + "", System.currentTimeMillis()); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth, + statusList, null, nodeHeartbeat1)); + + Assert.assertEquals(1, node1.getRunningApps().size()); + Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0)); + + // Create a running container for app2 running on nm2 + ContainerId runningContainerId2 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + app2.getApplicationId(), 0), 0); + + ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2, + ContainerState.RUNNING, "", 0); + statusList = new ArrayList(); + statusList.add(status2); + node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeHealth, + statusList, null, nodeHeartbeat2)); + Assert.assertEquals(1, node2.getRunningApps().size()); + Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + Map map1 = nodeHeartbeat1.getAppCollectorsMap(); + Assert.assertEquals(1, map1.size()); + Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId())); + + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + Map map2 = nodeHeartbeat2.getAppCollectorsMap(); + Assert.assertEquals(1, map2.size()); + Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId())); + } + private void checkRebootedNMCount(MockRM rm2, int count) throws InterruptedException {