YARN-3359. Recover collector list when RM fails over (Li Lu via Varun Saxena)
This commit is contained in:
parent
643a20a358
commit
91d3443806
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
|||
import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
|
||||
|
@ -465,8 +466,14 @@ public class NodeManager extends CompositeService
|
|||
if (!rmWorkPreservingRestartEnabled) {
|
||||
LOG.info("Cleaning up running containers on resync");
|
||||
containerManager.cleanupContainersOnNMResync();
|
||||
// Clear all known collectors for resync.
|
||||
if (context.getKnownCollectors() != null) {
|
||||
context.getKnownCollectors().clear();
|
||||
}
|
||||
} else {
|
||||
LOG.info("Preserving containers on resync");
|
||||
// Re-register known timeline collectors.
|
||||
reregisterCollectors();
|
||||
}
|
||||
((NodeStatusUpdaterImpl) nodeStatusUpdater)
|
||||
.rebootNodeStatusUpdaterAndRegisterWithRM();
|
||||
|
@ -478,6 +485,38 @@ public class NodeManager extends CompositeService
|
|||
}.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reregisters all collectors known by this node to the RM. This method is
|
||||
* called when the RM needs to resync with the node.
|
||||
*/
|
||||
protected void reregisterCollectors() {
|
||||
Map<ApplicationId, AppCollectorData> knownCollectors
|
||||
= context.getKnownCollectors();
|
||||
if (knownCollectors == null) {
|
||||
return;
|
||||
}
|
||||
Map<ApplicationId, AppCollectorData> registeringCollectors
|
||||
= context.getRegisteringCollectors();
|
||||
for (Map.Entry<ApplicationId, AppCollectorData> entry
|
||||
: knownCollectors.entrySet()) {
|
||||
Application app = context.getApplications().get(entry.getKey());
|
||||
if ((app != null)
|
||||
&& !ApplicationState.FINISHED.equals(app.getApplicationState())) {
|
||||
registeringCollectors.putIfAbsent(entry.getKey(), entry.getValue());
|
||||
AppCollectorData data = entry.getValue();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(entry.getKey() + " : " + data.getCollectorAddr() + "@<"
|
||||
+ data.getRMIdentifier() + ", " + data.getVersion() + ">");
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove collector data for done app " + entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
knownCollectors.clear();
|
||||
}
|
||||
|
||||
public static class NMContext implements Context {
|
||||
|
||||
private NodeId nodeId = null;
|
||||
|
|
|
@ -656,6 +656,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
LOG.warn("Cannot update collector info because application ID: " +
|
||||
appId + " is not found in RMContext!");
|
||||
} else {
|
||||
synchronized (rmApp) {
|
||||
AppCollectorData previousCollectorData = rmApp.getCollectorData();
|
||||
if (AppCollectorData.happensBefore(previousCollectorData,
|
||||
collectorData)) {
|
||||
|
@ -665,7 +666,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
// data before this update is done, and the RM then crashes, the
|
||||
// newly updated collector data will get lost.
|
||||
LOG.info("Update collector information for application " + appId
|
||||
+ " with new address: " + collectorData.getCollectorAddr());
|
||||
+ " with new address: " + collectorData.getCollectorAddr()
|
||||
+ " timestamp: " + collectorData.getRMIdentifier()
|
||||
+ ", " + collectorData.getVersion());
|
||||
((RMAppImpl) rmApp).setCollectorData(collectorData);
|
||||
}
|
||||
}
|
||||
|
@ -673,6 +676,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if node in decommissioning state.
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
|
@ -60,6 +62,8 @@ public class MockNM {
|
|||
private String version;
|
||||
private Map<ContainerId, ContainerStatus> containerStats =
|
||||
new HashMap<ContainerId, ContainerStatus>();
|
||||
private Map<ApplicationId, AppCollectorData> registeringCollectors
|
||||
= new ConcurrentHashMap<>();
|
||||
|
||||
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||
// scale vcores based on the requested memory
|
||||
|
@ -117,6 +121,15 @@ public class MockNM {
|
|||
true, ++responseId);
|
||||
}
|
||||
|
||||
public void addRegisteringCollector(ApplicationId appId,
|
||||
AppCollectorData data) {
|
||||
this.registeringCollectors.put(appId, data);
|
||||
}
|
||||
|
||||
public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
|
||||
return this.registeringCollectors;
|
||||
}
|
||||
|
||||
public RegisterNodeManagerResponse registerNode() throws Exception {
|
||||
return registerNode(null, null);
|
||||
}
|
||||
|
@ -229,6 +242,9 @@ public class MockNM {
|
|||
req.setNodeStatus(status);
|
||||
req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey);
|
||||
req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
|
||||
|
||||
req.setRegisteringCollectors(this.registeringCollectors);
|
||||
|
||||
NodeHeartbeatResponse heartbeatResponse =
|
||||
resourceTracker.nodeHeartbeat(req);
|
||||
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Test if the new active RM could recover collector status on a state
|
||||
* transition.
|
||||
*/
|
||||
public class TestRMHATimelineCollectors extends RMHATestBase {
|
||||
public static final Log LOG = LogFactory
|
||||
.getLog(TestSubmitApplicationWithRMHA.class);
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
confForRM1.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
confForRM2.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
confForRM1.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
confForRM2.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRebuildCollectorDataOnFailover() throws Exception {
|
||||
startRMs();
|
||||
MockNM nm1
|
||||
= new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||
MockNM nm2
|
||||
= new MockNM("127.0.0.1:5678", 15121, rm2.getResourceTrackerService());
|
||||
RMApp app1 = rm1.submitApp(1024);
|
||||
String collectorAddr1 = "1.2.3.4:5";
|
||||
AppCollectorData data1 = AppCollectorData.newInstance(
|
||||
app1.getApplicationId(), collectorAddr1);
|
||||
nm1.addRegisteringCollector(app1.getApplicationId(), data1);
|
||||
|
||||
String collectorAddr2 = "5.4.3.2:1";
|
||||
RMApp app2 = rm1.submitApp(1024);
|
||||
AppCollectorData data2 = AppCollectorData.newInstance(
|
||||
app2.getApplicationId(), collectorAddr2, rm1.getStartTime(), 1);
|
||||
nm1.addRegisteringCollector(app2.getApplicationId(), data2);
|
||||
|
||||
explicitFailover();
|
||||
|
||||
List<ApplicationId> runningApps = new ArrayList<>();
|
||||
runningApps.add(app1.getApplicationId());
|
||||
runningApps.add(app2.getApplicationId());
|
||||
nm1.registerNode(runningApps);
|
||||
nm2.registerNode(runningApps);
|
||||
|
||||
String collectorAddr12 = "1.2.3.4:56";
|
||||
AppCollectorData data12 = AppCollectorData.newInstance(
|
||||
app1.getApplicationId(), collectorAddr12, rm1.getStartTime(), 0);
|
||||
nm2.addRegisteringCollector(app1.getApplicationId(), data12);
|
||||
|
||||
String collectorAddr22 = "5.4.3.2:10";
|
||||
AppCollectorData data22 = AppCollectorData.newInstance(
|
||||
app2.getApplicationId(), collectorAddr22, rm1.getStartTime(), 2);
|
||||
nm2.addRegisteringCollector(app2.getApplicationId(), data22);
|
||||
|
||||
Map<ApplicationId, AppCollectorData> results1
|
||||
= nm1.nodeHeartbeat(true).getAppCollectors();
|
||||
assertEquals(collectorAddr1,
|
||||
results1.get(app1.getApplicationId()).getCollectorAddr());
|
||||
assertEquals(collectorAddr2,
|
||||
results1.get(app2.getApplicationId()).getCollectorAddr());
|
||||
|
||||
Map<ApplicationId, AppCollectorData> results2
|
||||
= nm2.nodeHeartbeat(true).getAppCollectors();
|
||||
// addr of app1 should be collectorAddr1 since it's registering (no time
|
||||
// stamp).
|
||||
assertEquals(collectorAddr1,
|
||||
results2.get(app1.getApplicationId()).getCollectorAddr());
|
||||
// addr of app2 should be collectorAddr22 since its version number is
|
||||
// greater.
|
||||
assertEquals(collectorAddr22,
|
||||
results2.get(app2.getApplicationId()).getCollectorAddr());
|
||||
|
||||
// Now nm1 should get updated collector list
|
||||
nm1.getRegisteringCollectors().clear();
|
||||
Map<ApplicationId, AppCollectorData> results12
|
||||
= nm1.nodeHeartbeat(true).getAppCollectors();
|
||||
assertEquals(collectorAddr1,
|
||||
results12.get(app1.getApplicationId()).getCollectorAddr());
|
||||
assertEquals(collectorAddr22,
|
||||
results12.get(app2.getApplicationId()).getCollectorAddr());
|
||||
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue