YARN-3359. Recover collector list when RM fails over (Li Lu via Varun Saxena)
This commit is contained in:
parent
bcb999939d
commit
1c8a57550b
|
@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
|
import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
@ -459,8 +460,14 @@ public class NodeManager extends CompositeService
|
||||||
if (!rmWorkPreservingRestartEnabled) {
|
if (!rmWorkPreservingRestartEnabled) {
|
||||||
LOG.info("Cleaning up running containers on resync");
|
LOG.info("Cleaning up running containers on resync");
|
||||||
containerManager.cleanupContainersOnNMResync();
|
containerManager.cleanupContainersOnNMResync();
|
||||||
|
// Clear all known collectors for resync.
|
||||||
|
if (context.getKnownCollectors() != null) {
|
||||||
|
context.getKnownCollectors().clear();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Preserving containers on resync");
|
LOG.info("Preserving containers on resync");
|
||||||
|
// Re-register known timeline collectors.
|
||||||
|
reregisterCollectors();
|
||||||
}
|
}
|
||||||
((NodeStatusUpdaterImpl) nodeStatusUpdater)
|
((NodeStatusUpdaterImpl) nodeStatusUpdater)
|
||||||
.rebootNodeStatusUpdaterAndRegisterWithRM();
|
.rebootNodeStatusUpdaterAndRegisterWithRM();
|
||||||
|
@ -472,6 +479,38 @@ public class NodeManager extends CompositeService
|
||||||
}.start();
|
}.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reregisters all collectors known by this node to the RM. This method is
|
||||||
|
* called when the RM needs to resync with the node.
|
||||||
|
*/
|
||||||
|
protected void reregisterCollectors() {
|
||||||
|
Map<ApplicationId, AppCollectorData> knownCollectors
|
||||||
|
= context.getKnownCollectors();
|
||||||
|
if (knownCollectors == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Map<ApplicationId, AppCollectorData> registeringCollectors
|
||||||
|
= context.getRegisteringCollectors();
|
||||||
|
for (Map.Entry<ApplicationId, AppCollectorData> entry
|
||||||
|
: knownCollectors.entrySet()) {
|
||||||
|
Application app = context.getApplications().get(entry.getKey());
|
||||||
|
if ((app != null)
|
||||||
|
&& !ApplicationState.FINISHED.equals(app.getApplicationState())) {
|
||||||
|
registeringCollectors.putIfAbsent(entry.getKey(), entry.getValue());
|
||||||
|
AppCollectorData data = entry.getValue();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(entry.getKey() + " : " + data.getCollectorAddr() + "@<"
|
||||||
|
+ data.getRMIdentifier() + ", " + data.getVersion() + ">");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Remove collector data for done app " + entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
knownCollectors.clear();
|
||||||
|
}
|
||||||
|
|
||||||
public static class NMContext implements Context {
|
public static class NMContext implements Context {
|
||||||
|
|
||||||
private NodeId nodeId = null;
|
private NodeId nodeId = null;
|
||||||
|
|
|
@ -655,17 +655,21 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
LOG.warn("Cannot update collector info because application ID: " +
|
LOG.warn("Cannot update collector info because application ID: " +
|
||||||
appId + " is not found in RMContext!");
|
appId + " is not found in RMContext!");
|
||||||
} else {
|
} else {
|
||||||
AppCollectorData previousCollectorData = rmApp.getCollectorData();
|
synchronized (rmApp) {
|
||||||
if (AppCollectorData.happensBefore(previousCollectorData,
|
AppCollectorData previousCollectorData = rmApp.getCollectorData();
|
||||||
collectorData)) {
|
if (AppCollectorData.happensBefore(previousCollectorData,
|
||||||
// Sending collector update event.
|
collectorData)) {
|
||||||
// Note: RM has to store the newly received collector data
|
// Sending collector update event.
|
||||||
// synchronously. Otherwise, the RM may send out stale collector
|
// Note: RM has to store the newly received collector data
|
||||||
// data before this update is done, and the RM then crashes, the
|
// synchronously. Otherwise, the RM may send out stale collector
|
||||||
// newly updated collector data will get lost.
|
// data before this update is done, and the RM then crashes, the
|
||||||
LOG.info("Update collector information for application " + appId
|
// newly updated collector data will get lost.
|
||||||
+ " with new address: " + collectorData.getCollectorAddr());
|
LOG.info("Update collector information for application " + appId
|
||||||
((RMAppImpl) rmApp).setCollectorData(collectorData);
|
+ " with new address: " + collectorData.getCollectorAddr()
|
||||||
|
+ " timestamp: " + collectorData.getRMIdentifier()
|
||||||
|
+ ", " + collectorData.getVersion());
|
||||||
|
((RMAppImpl) rmApp).setCollectorData(collectorData);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
@ -60,6 +62,8 @@ public class MockNM {
|
||||||
private String version;
|
private String version;
|
||||||
private Map<ContainerId, ContainerStatus> containerStats =
|
private Map<ContainerId, ContainerStatus> containerStats =
|
||||||
new HashMap<ContainerId, ContainerStatus>();
|
new HashMap<ContainerId, ContainerStatus>();
|
||||||
|
private Map<ApplicationId, AppCollectorData> registeringCollectors
|
||||||
|
= new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||||
// scale vcores based on the requested memory
|
// scale vcores based on the requested memory
|
||||||
|
@ -117,6 +121,15 @@ public class MockNM {
|
||||||
true, ++responseId);
|
true, ++responseId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addRegisteringCollector(ApplicationId appId,
|
||||||
|
AppCollectorData data) {
|
||||||
|
this.registeringCollectors.put(appId, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
|
||||||
|
return this.registeringCollectors;
|
||||||
|
}
|
||||||
|
|
||||||
public RegisterNodeManagerResponse registerNode() throws Exception {
|
public RegisterNodeManagerResponse registerNode() throws Exception {
|
||||||
return registerNode(null, null);
|
return registerNode(null, null);
|
||||||
}
|
}
|
||||||
|
@ -223,6 +236,9 @@ public class MockNM {
|
||||||
req.setNodeStatus(status);
|
req.setNodeStatus(status);
|
||||||
req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey);
|
req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey);
|
||||||
req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
|
req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
|
||||||
|
|
||||||
|
req.setRegisteringCollectors(this.registeringCollectors);
|
||||||
|
|
||||||
NodeHeartbeatResponse heartbeatResponse =
|
NodeHeartbeatResponse heartbeatResponse =
|
||||||
resourceTracker.nodeHeartbeat(req);
|
resourceTracker.nodeHeartbeat(req);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if the new active RM could recover collector status on a state
|
||||||
|
* transition.
|
||||||
|
*/
|
||||||
|
public class TestRMHATimelineCollectors extends RMHATestBase {
|
||||||
|
public static final Log LOG = LogFactory
|
||||||
|
.getLog(TestSubmitApplicationWithRMHA.class);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
super.setup();
|
||||||
|
confForRM1.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
confForRM2.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
confForRM1.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
confForRM2.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRebuildCollectorDataOnFailover() throws Exception {
|
||||||
|
startRMs();
|
||||||
|
MockNM nm1
|
||||||
|
= new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||||
|
MockNM nm2
|
||||||
|
= new MockNM("127.0.0.1:5678", 15121, rm2.getResourceTrackerService());
|
||||||
|
RMApp app1 = rm1.submitApp(1024);
|
||||||
|
String collectorAddr1 = "1.2.3.4:5";
|
||||||
|
AppCollectorData data1 = AppCollectorData.newInstance(
|
||||||
|
app1.getApplicationId(), collectorAddr1);
|
||||||
|
nm1.addRegisteringCollector(app1.getApplicationId(), data1);
|
||||||
|
|
||||||
|
String collectorAddr2 = "5.4.3.2:1";
|
||||||
|
RMApp app2 = rm1.submitApp(1024);
|
||||||
|
AppCollectorData data2 = AppCollectorData.newInstance(
|
||||||
|
app2.getApplicationId(), collectorAddr2, rm1.getStartTime(), 1);
|
||||||
|
nm1.addRegisteringCollector(app2.getApplicationId(), data2);
|
||||||
|
|
||||||
|
explicitFailover();
|
||||||
|
|
||||||
|
List<ApplicationId> runningApps = new ArrayList<>();
|
||||||
|
runningApps.add(app1.getApplicationId());
|
||||||
|
runningApps.add(app2.getApplicationId());
|
||||||
|
nm1.registerNode(runningApps);
|
||||||
|
nm2.registerNode(runningApps);
|
||||||
|
|
||||||
|
String collectorAddr12 = "1.2.3.4:56";
|
||||||
|
AppCollectorData data12 = AppCollectorData.newInstance(
|
||||||
|
app1.getApplicationId(), collectorAddr12, rm1.getStartTime(), 0);
|
||||||
|
nm2.addRegisteringCollector(app1.getApplicationId(), data12);
|
||||||
|
|
||||||
|
String collectorAddr22 = "5.4.3.2:10";
|
||||||
|
AppCollectorData data22 = AppCollectorData.newInstance(
|
||||||
|
app2.getApplicationId(), collectorAddr22, rm1.getStartTime(), 2);
|
||||||
|
nm2.addRegisteringCollector(app2.getApplicationId(), data22);
|
||||||
|
|
||||||
|
Map<ApplicationId, AppCollectorData> results1
|
||||||
|
= nm1.nodeHeartbeat(true).getAppCollectors();
|
||||||
|
assertEquals(collectorAddr1,
|
||||||
|
results1.get(app1.getApplicationId()).getCollectorAddr());
|
||||||
|
assertEquals(collectorAddr2,
|
||||||
|
results1.get(app2.getApplicationId()).getCollectorAddr());
|
||||||
|
|
||||||
|
Map<ApplicationId, AppCollectorData> results2
|
||||||
|
= nm2.nodeHeartbeat(true).getAppCollectors();
|
||||||
|
// addr of app1 should be collectorAddr1 since it's registering (no time
|
||||||
|
// stamp).
|
||||||
|
assertEquals(collectorAddr1,
|
||||||
|
results2.get(app1.getApplicationId()).getCollectorAddr());
|
||||||
|
// addr of app2 should be collectorAddr22 since its version number is
|
||||||
|
// greater.
|
||||||
|
assertEquals(collectorAddr22,
|
||||||
|
results2.get(app2.getApplicationId()).getCollectorAddr());
|
||||||
|
|
||||||
|
// Now nm1 should get updated collector list
|
||||||
|
nm1.getRegisteringCollectors().clear();
|
||||||
|
Map<ApplicationId, AppCollectorData> results12
|
||||||
|
= nm1.nodeHeartbeat(true).getAppCollectors();
|
||||||
|
assertEquals(collectorAddr1,
|
||||||
|
results12.get(app1.getApplicationId()).getCollectorAddr());
|
||||||
|
assertEquals(collectorAddr22,
|
||||||
|
results12.get(app2.getApplicationId()).getCollectorAddr());
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue