diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 21fe0cbf1d8..41e35d14457 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; @@ -465,8 +466,14 @@ public class NodeManager extends CompositeService if (!rmWorkPreservingRestartEnabled) { LOG.info("Cleaning up running containers on resync"); containerManager.cleanupContainersOnNMResync(); + // Clear all known collectors for resync. + if (context.getKnownCollectors() != null) { + context.getKnownCollectors().clear(); + } } else { LOG.info("Preserving containers on resync"); + // Re-register known timeline collectors. + reregisterCollectors(); } ((NodeStatusUpdaterImpl) nodeStatusUpdater) .rebootNodeStatusUpdaterAndRegisterWithRM(); @@ -478,6 +485,38 @@ public class NodeManager extends CompositeService }.start(); } + /** + * Reregisters all collectors known by this node to the RM. This method is + * called when the RM needs to resync with the node. + */ + protected void reregisterCollectors() { + Map knownCollectors + = context.getKnownCollectors(); + if (knownCollectors == null) { + return; + } + Map registeringCollectors + = context.getRegisteringCollectors(); + for (Map.Entry entry + : knownCollectors.entrySet()) { + Application app = context.getApplications().get(entry.getKey()); + if ((app != null) + && !ApplicationState.FINISHED.equals(app.getApplicationState())) { + registeringCollectors.putIfAbsent(entry.getKey(), entry.getValue()); + AppCollectorData data = entry.getValue(); + if (LOG.isDebugEnabled()) { + LOG.debug(entry.getKey() + " : " + data.getCollectorAddr() + "@<" + + data.getRMIdentifier() + ", " + data.getVersion() + ">"); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove collector data for done app " + entry.getKey()); + } + } + } + knownCollectors.clear(); + } + public static class NMContext implements Context { private NodeId nodeId = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 99f67527fad..2deec6f6a2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -656,17 +656,21 @@ public class ResourceTrackerService extends AbstractService implements LOG.warn("Cannot update collector info because application ID: " + appId + " is not found in RMContext!"); } else { - AppCollectorData previousCollectorData = rmApp.getCollectorData(); - if (AppCollectorData.happensBefore(previousCollectorData, - collectorData)) { - // Sending collector update event. - // Note: RM has to store the newly received collector data - // synchronously. Otherwise, the RM may send out stale collector - // data before this update is done, and the RM then crashes, the - // newly updated collector data will get lost. - LOG.info("Update collector information for application " + appId - + " with new address: " + collectorData.getCollectorAddr()); - ((RMAppImpl) rmApp).setCollectorData(collectorData); + synchronized (rmApp) { + AppCollectorData previousCollectorData = rmApp.getCollectorData(); + if (AppCollectorData.happensBefore(previousCollectorData, + collectorData)) { + // Sending collector update event. + // Note: RM has to store the newly received collector data + // synchronously. Otherwise, the RM may send out stale collector + // data before this update is done, and the RM then crashes, the + // newly updated collector data will get lost. + LOG.info("Update collector information for application " + appId + + " with new address: " + collectorData.getCollectorAddr() + + " timestamp: " + collectorData.getRMIdentifier() + + ", " + collectorData.getVersion()); + ((RMAppImpl) rmApp).setCollectorData(collectorData); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2d7612738ca..4a8ff00099b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -60,6 +62,8 @@ public class MockNM { private String version; private Map containerStats = new HashMap(); + private Map registeringCollectors + = new ConcurrentHashMap<>(); public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -117,6 +121,15 @@ public class MockNM { true, ++responseId); } + public void addRegisteringCollector(ApplicationId appId, + AppCollectorData data) { + this.registeringCollectors.put(appId, data); + } + + public Map getRegisteringCollectors() { + return this.registeringCollectors; + } + public RegisterNodeManagerResponse registerNode() throws Exception { return registerNode(null, null); } @@ -229,6 +242,9 @@ public class MockNM { req.setNodeStatus(status); req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey); req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); + + req.setRegisteringCollectors(this.registeringCollectors); + NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java new file mode 100644 index 00000000000..a54ff341c4c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Test if the new active RM could recover collector status on a state + * transition. + */ +public class TestRMHATimelineCollectors extends RMHATestBase { + public static final Log LOG = LogFactory + .getLog(TestSubmitApplicationWithRMHA.class); + + @Before + @Override + public void setup() throws Exception { + super.setup(); + confForRM1.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + confForRM2.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + confForRM1.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + confForRM2.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + } + + @Test + public void testRebuildCollectorDataOnFailover() throws Exception { + startRMs(); + MockNM nm1 + = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + MockNM nm2 + = new MockNM("127.0.0.1:5678", 15121, rm2.getResourceTrackerService()); + RMApp app1 = rm1.submitApp(1024); + String collectorAddr1 = "1.2.3.4:5"; + AppCollectorData data1 = AppCollectorData.newInstance( + app1.getApplicationId(), collectorAddr1); + nm1.addRegisteringCollector(app1.getApplicationId(), data1); + + String collectorAddr2 = "5.4.3.2:1"; + RMApp app2 = rm1.submitApp(1024); + AppCollectorData data2 = AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr2, rm1.getStartTime(), 1); + nm1.addRegisteringCollector(app2.getApplicationId(), data2); + + explicitFailover(); + + List runningApps = new ArrayList<>(); + runningApps.add(app1.getApplicationId()); + runningApps.add(app2.getApplicationId()); + nm1.registerNode(runningApps); + nm2.registerNode(runningApps); + + String collectorAddr12 = "1.2.3.4:56"; + AppCollectorData data12 = AppCollectorData.newInstance( + app1.getApplicationId(), collectorAddr12, rm1.getStartTime(), 0); + nm2.addRegisteringCollector(app1.getApplicationId(), data12); + + String collectorAddr22 = "5.4.3.2:10"; + AppCollectorData data22 = AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr22, rm1.getStartTime(), 2); + nm2.addRegisteringCollector(app2.getApplicationId(), data22); + + Map results1 + = nm1.nodeHeartbeat(true).getAppCollectors(); + assertEquals(collectorAddr1, + results1.get(app1.getApplicationId()).getCollectorAddr()); + assertEquals(collectorAddr2, + results1.get(app2.getApplicationId()).getCollectorAddr()); + + Map results2 + = nm2.nodeHeartbeat(true).getAppCollectors(); + // addr of app1 should be collectorAddr1 since it's registering (no time + // stamp). + assertEquals(collectorAddr1, + results2.get(app1.getApplicationId()).getCollectorAddr()); + // addr of app2 should be collectorAddr22 since its version number is + // greater. + assertEquals(collectorAddr22, + results2.get(app2.getApplicationId()).getCollectorAddr()); + + // Now nm1 should get updated collector list + nm1.getRegisteringCollectors().clear(); + Map results12 + = nm1.nodeHeartbeat(true).getAppCollectors(); + assertEquals(collectorAddr1, + results12.get(app1.getApplicationId()).getCollectorAddr()); + assertEquals(collectorAddr22, + results12.get(app2.getApplicationId()).getCollectorAddr()); + + + } +}