YARN-1181. Augment MiniYARNCluster to support HA mode (kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1548333 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2013-12-06 00:10:03 +00:00
parent 7505baf49f
commit 2ed6ee6fe0
3 changed files with 216 additions and 53 deletions

View File

@ -123,6 +123,8 @@ Release 2.4.0 - UNRELEASED
YARN-1403. Separate out configuration loading from QueueManager in the Fair YARN-1403. Separate out configuration loading from QueueManager in the Fair
Scheduler (Sandy Ryza) Scheduler (Sandy Ryza)
YARN-1181. Augment MiniYARNCluster to support HA mode (Karthik Kambatla)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -25,18 +25,21 @@ import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -87,7 +90,7 @@ public class MiniYARNCluster extends CompositeService {
} }
private NodeManager[] nodeManagers; private NodeManager[] nodeManagers;
private ResourceManager resourceManager; private ResourceManager[] resourceManagers;
private ResourceManagerWrapper resourceManagerWrapper; private ResourceManagerWrapper resourceManagerWrapper;
@ -103,12 +106,14 @@ public class MiniYARNCluster extends CompositeService {
/** /**
* @param testName name of the test * @param testName name of the test
* @param noOfNodeManagers the number of node managers in the cluster * @param numResourceManagers the number of resource managers in the cluster
* @param numNodeManagers the number of node managers in the cluster
* @param numLocalDirs the number of nm-local-dirs per nodemanager * @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager * @param numLogDirs the number of nm-log-dirs per nodemanager
*/ */
public MiniYARNCluster(String testName, int noOfNodeManagers, public MiniYARNCluster(
int numLocalDirs, int numLogDirs) { String testName, int numResourceManagers, int numNodeManagers,
int numLocalDirs, int numLogDirs) {
super(testName.replace("$", "")); super(testName.replace("$", ""));
this.numLocalDirs = numLocalDirs; this.numLocalDirs = numLocalDirs;
this.numLogDirs = numLogDirs; this.numLogDirs = numLogDirs;
@ -157,28 +162,103 @@ public class MiniYARNCluster extends CompositeService {
this.testWorkDir = targetWorkDir; this.testWorkDir = targetWorkDir;
} }
resourceManagerWrapper = new ResourceManagerWrapper(); resourceManagers = new ResourceManager[numResourceManagers];
addService(resourceManagerWrapper); for (int i = 0; i < numResourceManagers; i++) {
nodeManagers = new CustomNodeManager[noOfNodeManagers]; resourceManagers[i] = new ResourceManager();
for(int index = 0; index < noOfNodeManagers; index++) { addService(new ResourceManagerWrapper(i));
}
nodeManagers = new CustomNodeManager[numNodeManagers];
for(int index = 0; index < numNodeManagers; index++) {
addService(new NodeManagerWrapper(index)); addService(new NodeManagerWrapper(index));
nodeManagers[index] = new CustomNodeManager(); nodeManagers[index] = new CustomNodeManager();
} }
} }
@Override /**
* @param testName name of the test
* @param numNodeManagers the number of node managers in the cluster
* @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager
*/
public MiniYARNCluster(String testName, int numNodeManagers,
int numLocalDirs, int numLogDirs) {
this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
}
@Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf instanceof YarnConfiguration ? conf if (resourceManagers.length > 1) {
: new YarnConfiguration( conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf));
StringBuilder rmIds = new StringBuilder();
for (int i = 0; i < resourceManagers.length; i++) {
if (i != 0) {
rmIds.append(",");
}
rmIds.append("rm" + i);
}
conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
}
super.serviceInit(
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
} }
public File getTestWorkDir() { public File getTestWorkDir() {
return testWorkDir; return testWorkDir;
} }
/**
* In a HA cluster, go through all the RMs and find the Active RM. If none
* of them are active, wait upto 5 seconds for them to transition to Active.
*
* In an non-HA cluster, return the index of the only RM.
*
* @return index of the active RM
*/
@InterfaceAudience.Private
@VisibleForTesting
int getActiveRMIndex() {
if (resourceManagers.length == 1) {
return 0;
}
int numRetriesForRMBecomingActive = 5;
while (numRetriesForRMBecomingActive-- > 0) {
for (int i = 0; i < resourceManagers.length; i++) {
try {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
resourceManagers[i].getRMContext().getRMAdminService()
.getServiceStatus().getState()) {
return i;
}
} catch (IOException e) {
throw new YarnRuntimeException("Couldn't read the status of " +
"a ResourceManger in the HA ensemble.", e);
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new YarnRuntimeException("Interrupted while waiting for one " +
"of the ResourceManagers to become active");
}
}
return -1;
}
/**
* @return the active {@link ResourceManager} of the cluster,
* null if none of them are active.
*/
public ResourceManager getResourceManager() { public ResourceManager getResourceManager() {
return this.resourceManager; int activeRMIndex = getActiveRMIndex();
return activeRMIndex == -1
? null
: this.resourceManagers[getActiveRMIndex()];
}
public ResourceManager getResourceManager(int i) {
return this.resourceManagers[i];
} }
public NodeManager getNodeManager(int i) { public NodeManager getNodeManager(int i) {
@ -195,8 +275,29 @@ public class MiniYARNCluster extends CompositeService {
} }
private class ResourceManagerWrapper extends AbstractService { private class ResourceManagerWrapper extends AbstractService {
public ResourceManagerWrapper() { private int index;
super(ResourceManagerWrapper.class.getName());
public ResourceManagerWrapper(int i) {
super(ResourceManagerWrapper.class.getName() + "_" + i);
index = i;
}
private void setNonHARMConfiguration(Configuration conf) {
String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
}
private void setHARMConfiguration(Configuration conf) {
String rmId = "rm" + index;
String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.RM_HA_ID, rmId);
for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, rmId), hostname + ":0");
}
} }
@Override @Override
@ -206,22 +307,15 @@ public class MiniYARNCluster extends CompositeService {
if (!conf.getBoolean( if (!conf.getBoolean(
YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
// pick free random ports. if (HAUtil.isHAEnabled(conf)) {
String hostname = MiniYARNCluster.getHostname(); setHARMConfiguration(conf);
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); } else {
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); setNonHARMConfiguration(conf);
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); }
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
} }
resourceManager = new ResourceManager() { resourceManagers[index].init(conf);
@Override resourceManagers[index].getRMContext().getDispatcher().register
protected void doSecureLogin() throws IOException { (RMAppAttemptEventType.class,
// Don't try to login using keytab in the testcase.
};
};
resourceManager.init(conf);
resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,
new EventHandler<RMAppAttemptEvent>() { new EventHandler<RMAppAttemptEvent>() {
public void handle(RMAppAttemptEvent event) { public void handle(RMAppAttemptEvent event) {
if (event instanceof RMAppAttemptRegistrationEvent) { if (event instanceof RMAppAttemptRegistrationEvent) {
@ -239,20 +333,20 @@ public class MiniYARNCluster extends CompositeService {
try { try {
new Thread() { new Thread() {
public void run() { public void run() {
resourceManager.start(); resourceManagers[index].start();
}; }
}.start(); }.start();
int waitCount = 0; int waitCount = 0;
while (resourceManager.getServiceState() == STATE.INITED while (resourceManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) { && waitCount++ < 60) {
LOG.info("Waiting for RM to start..."); LOG.info("Waiting for RM to start...");
Thread.sleep(1500); Thread.sleep(1500);
} }
if (resourceManager.getServiceState() != STATE.STARTED) { if (resourceManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed. // RM could have failed.
throw new IOException( throw new IOException(
"ResourceManager failed to start. Final state is " "ResourceManager failed to start. Final state is "
+ resourceManager.getServiceState()); + resourceManagers[index].getServiceState());
} }
super.serviceStart(); super.serviceStart();
} catch (Throwable t) { } catch (Throwable t) {
@ -278,9 +372,9 @@ public class MiniYARNCluster extends CompositeService {
@Override @Override
protected synchronized void serviceStop() throws Exception { protected synchronized void serviceStop() throws Exception {
if (resourceManager != null) { if (resourceManagers[index] != null) {
waitForAppMastersToFinish(5000); waitForAppMastersToFinish(5000);
resourceManager.stop(); resourceManagers[index].stop();
} }
super.serviceStop(); super.serviceStop();
@ -372,7 +466,7 @@ public class MiniYARNCluster extends CompositeService {
new Thread() { new Thread() {
public void run() { public void run() {
nodeManagers[index].start(); nodeManagers[index].start();
}; }
}.start(); }.start();
int waitCount = 0; int waitCount = 0;
while (nodeManagers[index].getServiceState() == STATE.INITED while (nodeManagers[index].getServiceState() == STATE.INITED
@ -403,7 +497,7 @@ public class MiniYARNCluster extends CompositeService {
@Override @Override
protected void doSecureLogin() throws IOException { protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase. // Don't try to login using keytab in the testcase.
}; }
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@ -412,8 +506,8 @@ public class MiniYARNCluster extends CompositeService {
healthChecker, metrics) { healthChecker, metrics) {
@Override @Override
protected ResourceTracker getRMClient() { protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager final ResourceTrackerService rt =
.getResourceTrackerService(); getResourceManager().getResourceTrackerService();
final RecordFactory recordFactory = final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
@ -424,8 +518,7 @@ public class MiniYARNCluster extends CompositeService {
public NodeHeartbeatResponse nodeHeartbeat( public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnException, NodeHeartbeatRequest request) throws YarnException,
IOException { IOException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance( NodeHeartbeatResponse response;
NodeHeartbeatResponse.class);
try { try {
response = rt.nodeHeartbeat(request); response = rt.nodeHeartbeat(request);
} catch (YarnException e) { } catch (YarnException e) {
@ -440,8 +533,7 @@ public class MiniYARNCluster extends CompositeService {
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) RegisterNodeManagerRequest request)
throws YarnException, IOException { throws YarnException, IOException {
RegisterNodeManagerResponse response = recordFactory. RegisterNodeManagerResponse response;
newRecordInstance(RegisterNodeManagerResponse.class);
try { try {
response = rt.registerNodeManager(request); response = rt.registerNodeManager(request);
} catch (YarnException e) { } catch (YarnException e) {
@ -452,13 +544,11 @@ public class MiniYARNCluster extends CompositeService {
return response; return response;
} }
}; };
}; }
@Override @Override
protected void stopRMProxy() { protected void stopRMProxy() { }
return;
}
}; };
}; }
} }
} }

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.fail;
public class TestMiniYARNClusterForHA {
MiniYARNCluster cluster;
@Before
public void setup() throws IOException, InterruptedException {
Configuration conf = new YarnConfiguration();
cluster = new MiniYARNCluster(TestMiniYARNClusterForHA.class.getName(),
2, 1, 1, 1);
cluster.init(conf);
cluster.start();
cluster.getResourceManager(0).getRMContext().getRMAdminService()
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
}
@Test
public void testClusterWorks() throws YarnException, InterruptedException {
ResourceManager rm = cluster.getResourceManager(0);
GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
for (int i = 0; i < 600; i++) {
if (1 == rm.getClientRMService().getClusterMetrics(req)
.getClusterMetrics().getNumNodeManagers()) {
return;
}
Thread.sleep(100);
}
fail("NodeManager never registered with the RM");
}
}