From 2ed6ee6fe0251df57d2b0be0c8defea727032e89 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Fri, 6 Dec 2013 00:10:03 +0000 Subject: [PATCH] YARN-1181. Augment MiniYARNCluster to support HA mode (kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1548333 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/server/MiniYARNCluster.java | 196 +++++++++++++----- .../yarn/server/TestMiniYARNClusterForHA.java | 71 +++++++ 3 files changed, 216 insertions(+), 53 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9daf59e441e..6f62dae71e7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -123,6 +123,8 @@ Release 2.4.0 - UNRELEASED YARN-1403. Separate out configuration loading from QueueManager in the Fair Scheduler (Sandy Ryza) + YARN-1181. Augment MiniYARNCluster to support HA mode (Karthik Kambatla) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index dbb65075ef8..9829e86ab68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -25,18 +25,21 @@ import java.net.UnknownHostException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -87,7 +90,7 @@ public class MiniYARNCluster extends CompositeService { } private NodeManager[] nodeManagers; - private ResourceManager resourceManager; + private ResourceManager[] resourceManagers; private ResourceManagerWrapper resourceManagerWrapper; @@ -103,12 +106,14 @@ public class MiniYARNCluster extends CompositeService { /** * @param testName name of the test - * @param noOfNodeManagers the number of node managers in the cluster + * @param numResourceManagers the number of resource managers in the cluster + * @param numNodeManagers the number of node managers in the cluster * @param numLocalDirs the number of nm-local-dirs per nodemanager * @param numLogDirs the number of nm-log-dirs per nodemanager */ - public MiniYARNCluster(String testName, int noOfNodeManagers, - int numLocalDirs, int numLogDirs) { + public MiniYARNCluster( + String testName, int numResourceManagers, int numNodeManagers, + int numLocalDirs, int numLogDirs) { super(testName.replace("$", "")); this.numLocalDirs = numLocalDirs; this.numLogDirs = numLogDirs; @@ -157,28 +162,103 @@ public class MiniYARNCluster extends CompositeService { this.testWorkDir = targetWorkDir; } - resourceManagerWrapper = new ResourceManagerWrapper(); - addService(resourceManagerWrapper); - nodeManagers = new CustomNodeManager[noOfNodeManagers]; - for(int index = 0; index < noOfNodeManagers; index++) { + resourceManagers = new ResourceManager[numResourceManagers]; + for (int i = 0; i < numResourceManagers; i++) { + resourceManagers[i] = new ResourceManager(); + addService(new ResourceManagerWrapper(i)); + } + nodeManagers = new CustomNodeManager[numNodeManagers]; + for(int index = 0; index < numNodeManagers; index++) { addService(new NodeManagerWrapper(index)); nodeManagers[index] = new CustomNodeManager(); } } - - @Override + + /** + * @param testName name of the test + * @param numNodeManagers the number of node managers in the cluster + * @param numLocalDirs the number of nm-local-dirs per nodemanager + * @param numLogDirs the number of nm-log-dirs per nodemanager + */ + public MiniYARNCluster(String testName, int numNodeManagers, + int numLocalDirs, int numLogDirs) { + this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs); + } + + @Override public void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf instanceof YarnConfiguration ? conf - : new YarnConfiguration( - conf)); + if (resourceManagers.length > 1) { + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + + StringBuilder rmIds = new StringBuilder(); + for (int i = 0; i < resourceManagers.length; i++) { + if (i != 0) { + rmIds.append(","); + } + rmIds.append("rm" + i); + } + conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); + } + super.serviceInit( + conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } public File getTestWorkDir() { return testWorkDir; } + /** + * In a HA cluster, go through all the RMs and find the Active RM. If none + * of them are active, wait upto 5 seconds for them to transition to Active. + * + * In an non-HA cluster, return the index of the only RM. + * + * @return index of the active RM + */ + @InterfaceAudience.Private + @VisibleForTesting + int getActiveRMIndex() { + if (resourceManagers.length == 1) { + return 0; + } + + int numRetriesForRMBecomingActive = 5; + while (numRetriesForRMBecomingActive-- > 0) { + for (int i = 0; i < resourceManagers.length; i++) { + try { + if (HAServiceProtocol.HAServiceState.ACTIVE == + resourceManagers[i].getRMContext().getRMAdminService() + .getServiceStatus().getState()) { + return i; + } + } catch (IOException e) { + throw new YarnRuntimeException("Couldn't read the status of " + + "a ResourceManger in the HA ensemble.", e); + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new YarnRuntimeException("Interrupted while waiting for one " + + "of the ResourceManagers to become active"); + } + } + return -1; + } + + /** + * @return the active {@link ResourceManager} of the cluster, + * null if none of them are active. + */ public ResourceManager getResourceManager() { - return this.resourceManager; + int activeRMIndex = getActiveRMIndex(); + return activeRMIndex == -1 + ? null + : this.resourceManagers[getActiveRMIndex()]; + } + + public ResourceManager getResourceManager(int i) { + return this.resourceManagers[i]; } public NodeManager getNodeManager(int i) { @@ -195,8 +275,29 @@ public class MiniYARNCluster extends CompositeService { } private class ResourceManagerWrapper extends AbstractService { - public ResourceManagerWrapper() { - super(ResourceManagerWrapper.class.getName()); + private int index; + + public ResourceManagerWrapper(int i) { + super(ResourceManagerWrapper.class.getName() + "_" + i); + index = i; + } + + private void setNonHARMConfiguration(Configuration conf) { + String hostname = MiniYARNCluster.getHostname(); + conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0"); + WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0); + } + + private void setHARMConfiguration(Configuration conf) { + String rmId = "rm" + index; + String hostname = MiniYARNCluster.getHostname(); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) { + conf.set(HAUtil.addSuffix(confKey, rmId), hostname + ":0"); + } } @Override @@ -206,22 +307,15 @@ public class MiniYARNCluster extends CompositeService { if (!conf.getBoolean( YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { - // pick free random ports. - String hostname = MiniYARNCluster.getHostname(); - conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0"); - WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0); + if (HAUtil.isHAEnabled(conf)) { + setHARMConfiguration(conf); + } else { + setNonHARMConfiguration(conf); + } } - resourceManager = new ResourceManager() { - @Override - protected void doSecureLogin() throws IOException { - // Don't try to login using keytab in the testcase. - }; - }; - resourceManager.init(conf); - resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class, + resourceManagers[index].init(conf); + resourceManagers[index].getRMContext().getDispatcher().register + (RMAppAttemptEventType.class, new EventHandler() { public void handle(RMAppAttemptEvent event) { if (event instanceof RMAppAttemptRegistrationEvent) { @@ -239,20 +333,20 @@ public class MiniYARNCluster extends CompositeService { try { new Thread() { public void run() { - resourceManager.start(); - }; + resourceManagers[index].start(); + } }.start(); int waitCount = 0; - while (resourceManager.getServiceState() == STATE.INITED + while (resourceManagers[index].getServiceState() == STATE.INITED && waitCount++ < 60) { LOG.info("Waiting for RM to start..."); Thread.sleep(1500); } - if (resourceManager.getServiceState() != STATE.STARTED) { + if (resourceManagers[index].getServiceState() != STATE.STARTED) { // RM could have failed. throw new IOException( "ResourceManager failed to start. Final state is " - + resourceManager.getServiceState()); + + resourceManagers[index].getServiceState()); } super.serviceStart(); } catch (Throwable t) { @@ -278,9 +372,9 @@ public class MiniYARNCluster extends CompositeService { @Override protected synchronized void serviceStop() throws Exception { - if (resourceManager != null) { + if (resourceManagers[index] != null) { waitForAppMastersToFinish(5000); - resourceManager.stop(); + resourceManagers[index].stop(); } super.serviceStop(); @@ -372,7 +466,7 @@ public class MiniYARNCluster extends CompositeService { new Thread() { public void run() { nodeManagers[index].start(); - }; + } }.start(); int waitCount = 0; while (nodeManagers[index].getServiceState() == STATE.INITED @@ -398,12 +492,12 @@ public class MiniYARNCluster extends CompositeService { super.serviceStop(); } } - + private class CustomNodeManager extends NodeManager { @Override protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. - }; + } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -412,8 +506,8 @@ public class MiniYARNCluster extends CompositeService { healthChecker, metrics) { @Override protected ResourceTracker getRMClient() { - final ResourceTrackerService rt = resourceManager - .getResourceTrackerService(); + final ResourceTrackerService rt = + getResourceManager().getResourceTrackerService(); final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -424,8 +518,7 @@ public class MiniYARNCluster extends CompositeService { public NodeHeartbeatResponse nodeHeartbeat( NodeHeartbeatRequest request) throws YarnException, IOException { - NodeHeartbeatResponse response = recordFactory.newRecordInstance( - NodeHeartbeatResponse.class); + NodeHeartbeatResponse response; try { response = rt.nodeHeartbeat(request); } catch (YarnException e) { @@ -440,8 +533,7 @@ public class MiniYARNCluster extends CompositeService { public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - RegisterNodeManagerResponse response = recordFactory. - newRecordInstance(RegisterNodeManagerResponse.class); + RegisterNodeManagerResponse response; try { response = rt.registerNodeManager(request); } catch (YarnException e) { @@ -452,13 +544,11 @@ public class MiniYARNCluster extends CompositeService { return response; } }; - }; + } @Override - protected void stopRMProxy() { - return; - } + protected void stopRMProxy() { } }; - }; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java new file mode 100644 index 00000000000..f62124e5d39 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java @@ -0,0 +1,71 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.fail; + +public class TestMiniYARNClusterForHA { + MiniYARNCluster cluster; + + @Before + public void setup() throws IOException, InterruptedException { + Configuration conf = new YarnConfiguration(); + + cluster = new MiniYARNCluster(TestMiniYARNClusterForHA.class.getName(), + 2, 1, 1, 1); + cluster.init(conf); + cluster.start(); + + cluster.getResourceManager(0).getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + } + + @Test + public void testClusterWorks() throws YarnException, InterruptedException { + ResourceManager rm = cluster.getResourceManager(0); + GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); + + for (int i = 0; i < 600; i++) { + if (1 == rm.getClientRMService().getClusterMetrics(req) + .getClusterMetrics().getNumNodeManagers()) { + return; + } + Thread.sleep(100); + } + fail("NodeManager never registered with the RM"); + } +}