diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 33dcfbf2731..66c62517684 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -33,6 +33,9 @@ Release 2.3.0 - UNRELEASED YARN-1199. Make NM/RM Versions Available (Mit Desai via jeagles) + YARN-1232. Configuration to support multiple RMs (Karthik Kambatla via + bikas) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java new file mode 100644 index 00000000000..18f98961db6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.conf; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +@InterfaceAudience.Private +public class HAUtil { + private static Log LOG = LogFactory.getLog(HAUtil.class); + + public static final List RPC_ADDRESS_CONF_KEYS = + Collections.unmodifiableList(Arrays.asList( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.RM_WEBAPP_ADDRESS)); + + private HAUtil() { /* Hidden constructor */ } + + private static void throwBadConfigurationException(String msg) { + throw new YarnRuntimeException("Invalid configuration! " + msg); + } + + /** + * Returns true if Resource Manager HA is configured. + * + * @param conf Configuration + * @return true if HA is configured in the configuration; else false. + */ + public static boolean isHAEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.RM_HA_ENABLED, + YarnConfiguration.DEFAULT_RM_HA_ENABLED); + } + + public static Collection getRMHAIds(Configuration conf) { + return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS); + } + + /** + * @param conf Configuration + * @return RM Id on success + * @throws YarnRuntimeException for configurations without a node id + */ + @VisibleForTesting + public static String getRMHAId(Configuration conf) { + String rmId = conf.get(YarnConfiguration.RM_HA_ID); + if (rmId == null) { + throwBadConfigurationException(YarnConfiguration.RM_HA_ID + + " needs to be set in a HA configuration"); + } + return rmId; + } + + private static String getConfValueForRMInstance(String prefix, + Configuration conf) { + String confKey = addSuffix(prefix, getRMHAId(conf)); + String retVal = conf.get(confKey); + if (LOG.isTraceEnabled()) { + LOG.trace("getConfValueForRMInstance: prefix = " + prefix + + "; confKey being looked up = " + confKey + + "; value being set to = " + retVal); + } + return retVal; + } + + static String getConfValueForRMInstance(String prefix, String defaultValue, + Configuration conf) { + String value = getConfValueForRMInstance(prefix, conf); + return (value == null) ? defaultValue : value; + } + + private static void setConfValue(String prefix, Configuration conf) { + conf.set(prefix, getConfValueForRMInstance(prefix, conf)); + } + + public static void setAllRpcAddresses(Configuration conf) { + for (String confKey : RPC_ADDRESS_CONF_KEYS) { + setConfValue(confKey, conf); + } + } + + /** Add non empty and non null suffix to a key */ + @VisibleForTesting + public static String addSuffix(String key, String suffix) { + if (suffix == null || suffix.isEmpty()) { + return key; + } + if (suffix.startsWith(".")) { + throw new IllegalArgumentException("suffix '" + suffix + "' should not " + + "already have '.' prepended."); + } + return key + "." + suffix; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dc5baa1a166..e1327dee5a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -18,15 +18,12 @@ package org.apache.hadoop.yarn.conf; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -84,7 +81,7 @@ public class YarnConfiguration extends Configuration { // Resource Manager Configs //////////////////////////////// public static final String RM_PREFIX = "yarn.resourcemanager."; - + /** The address of the applications manager interface in the RM.*/ public static final String RM_ADDRESS = RM_PREFIX + "address"; @@ -281,6 +278,8 @@ public class YarnConfiguration extends Configuration { public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final boolean DEFAULT_RM_HA_ENABLED = false; + public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; + public static final String RM_HA_ID = RM_HA_PREFIX + "id"; //////////////////////////////// // RM state store configs @@ -854,4 +853,24 @@ public YarnConfiguration(Configuration conf) { this.reloadConfiguration(); } } + + /** + * Get the socket address for name property as a + * InetSocketAddress. + * @param name property name. + * @param defaultAddress the default value + * @param defaultPort the default port + * @return InetSocketAddress + */ + @Override + public InetSocketAddress getSocketAddr( + String name, String defaultAddress, int defaultPort) { + String address; + if (HAUtil.isHAEnabled(this)) { + address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this); + } else { + address = get(name, defaultAddress); + } + return NetUtils.createSocketAddr(address, defaultPort, name); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 37474b89adf..049f4cc8266 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -38,8 +38,11 @@ public class ClientRMProxy extends RMProxy { private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); - public static T createRMProxy(final Configuration conf, + public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { + YarnConfiguration conf = (configuration instanceof YarnConfiguration) + ? (YarnConfiguration) configuration + : new YarnConfiguration(configuration); InetSocketAddress rmAddress = getRMAddress(conf, protocol); return createRMProxy(conf, protocol, rmAddress); } @@ -60,7 +63,7 @@ private static void setupTokens(InetSocketAddress resourceManagerAddress) } } - private static InetSocketAddress getRMAddress(Configuration conf, + private static InetSocketAddress getRMAddress(YarnConfiguration conf, Class protocol) throws IOException { if (protocol == ApplicationClientProtocol.class) { return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 171b118b7bf..6dfeb6662e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -320,13 +320,34 @@ - Enable RM high-availability. When enabled, the RM starts - in the Standby mode by default, and transitions to the Active mode when - prompted to. + Enable RM high-availability. When enabled, + (1) The RM starts in the Standby mode by default, and transitions to + the Active mode when prompted to. + (2) The nodes in the RM ensemble are listed in + yarn.resourcemanager.ha.rm-ids + (3) The id of each RM comes from yarn.resourcemanager.ha.id + (4) The actual physical addresses come from the configs of the pattern + - {rpc-config}.{id} yarn.resourcemanager.ha.enabled false + + The list of RM nodes in the cluster when HA is + enabled. See description of yarn.resourcemanager.ha + .enabled for full details on how this is used. + yarn.resourcemanager.ha.rm-ids + + + + + The id (string) of the current RM. When HA is enabled, this + is a required config. See description of yarn.resourcemanager.ha.enabled + for full details on how this is used. + yarn.resourcemanager.ha.id + + + The maximum number of completed applications RM keeps. yarn.resourcemanager.max-completed-applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java new file mode 100644 index 00000000000..e0e46c4dc14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.conf; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestHAUtil { + private Configuration conf; + + private static final String RM1_ADDRESS = "1.2.3.4:8021"; + private static final String RM2_ADDRESS = "localhost:8022"; + private static final String RM1_NODE_ID = "rm1"; + private static final String RM2_NODE_ID = "rm2"; + + @Before + public void setUp() { + conf = new Configuration(); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); + + for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { + conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS); + conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS); + } + } + + @Test + public void testGetRMServiceId() throws Exception { + Collection rmhaIds = HAUtil.getRMHAIds(conf); + assertEquals(2, rmhaIds.size()); + } + + @Test + public void testGetRMId() throws Exception { + assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID, + RM1_NODE_ID, HAUtil.getRMHAId(conf)); + conf = new YarnConfiguration(); + try { + HAUtil.getRMHAId(conf); + fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set"); + } catch (YarnRuntimeException yre) { + // do nothing + } + } + + @Test + public void testSetGetRpcAddresses() throws Exception { + HAUtil.setAllRpcAddresses(conf); + for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { + assertEquals("RPC address not set for " + confKey, + RM1_ADDRESS, conf.get(confKey)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 0f90310d998..c25c5977b80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -31,13 +31,17 @@ public class ServerRMProxy extends RMProxy { private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); - public static T createRMProxy(final Configuration conf, + public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { + YarnConfiguration conf = (configuration instanceof YarnConfiguration) + ? (YarnConfiguration) configuration + : new YarnConfiguration(configuration); InetSocketAddress rmAddress = getRMAddress(conf, protocol); return createRMProxy(conf, protocol, rmAddress); } - private static InetSocketAddress getRMAddress(Configuration conf, Class protocol) { + private static InetSocketAddress getRMAddress(YarnConfiguration conf, + Class protocol) { if (protocol == ResourceTracker.class) { return conf.getSocketAddr( YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java index b9aca3cbe43..8fb92facd73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java @@ -29,8 +29,8 @@ import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.conf.HAUtil; import java.io.IOException; @@ -44,6 +44,7 @@ public class RMHAProtocolService extends AbstractService implements private ResourceManager rm; @VisibleForTesting protected HAServiceState haState = HAServiceState.INITIALIZING; + private boolean haEnabled; public RMHAProtocolService(ResourceManager resourceManager) { super("RMHAProtocolService"); @@ -51,17 +52,20 @@ public RMHAProtocolService(ResourceManager resourceManager) { } @Override - public synchronized void serviceInit(Configuration conf) throws Exception { + protected synchronized void serviceInit(Configuration conf) throws + Exception { this.conf = conf; + haEnabled = HAUtil.isHAEnabled(this.conf); + if (haEnabled) { + HAUtil.setAllRpcAddresses(this.conf); + rm.setConf(this.conf); + } rm.createAndInitActiveServices(); super.serviceInit(this.conf); } @Override - public synchronized void serviceStart() throws Exception { - boolean haEnabled = this.conf.getBoolean(YarnConfiguration.RM_HA_ENABLED, - YarnConfiguration.DEFAULT_RM_HA_ENABLED); - + protected synchronized void serviceStart() throws Exception { if (haEnabled) { transitionToStandby(true); } else { @@ -72,7 +76,7 @@ public synchronized void serviceStart() throws Exception { } @Override - public synchronized void serviceStop() throws Exception { + protected synchronized void serviceStop() throws Exception { transitionToStandby(false); haState = HAServiceState.STOPPING; super.serviceStop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 72d38084b3d..3a059217759 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -731,6 +731,10 @@ protected void startWepApp() { webApp = builder.start(new RMWebApp(this)); } + void setConf(Configuration configuration) { + conf = configuration; + } + /** * Helper method to create and init {@link #activeServices}. This creates an * instance of {@link RMActiveServices} and initializes it. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 7415791f094..869526e97cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.conf.HAUtil; import org.junit.Before; import org.junit.Test; @@ -42,10 +43,19 @@ public class TestRMHA { private static final String STATE_ERR = "ResourceManager is in wrong HA state"; + private static final String RM1_ADDRESS = "0.0.0.0:0"; + private static final String RM1_NODE_ID = "rm1"; + @Before public void setUp() throws Exception { Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID); + for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { + conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS); + } + conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); + rm = new MockRM(conf); rm.init(conf); }