YARN-1232. Configuration to support multiple RMs (Karthik Kambatla via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529251 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-10-04 18:40:18 +00:00
parent d4324eef14
commit cbab04727b
10 changed files with 286 additions and 18 deletions

View File

@ -48,6 +48,9 @@ Release 2.3.0 - UNRELEASED
YARN-1199. Make NM/RM Versions Available (Mit Desai via jeagles)
YARN-1232. Configuration to support multiple RMs (Karthik Kambatla via
bikas)
OPTIMIZATIONS
BUG FIXES

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.conf;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@InterfaceAudience.Private
public class HAUtil {
private static Log LOG = LogFactory.getLog(HAUtil.class);
public static final List<String> RPC_ADDRESS_CONF_KEYS =
Collections.unmodifiableList(Arrays.asList(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.RM_WEBAPP_ADDRESS));
private HAUtil() { /* Hidden constructor */ }
private static void throwBadConfigurationException(String msg) {
throw new YarnRuntimeException("Invalid configuration! " + msg);
}
/**
* Returns true if Resource Manager HA is configured.
*
* @param conf Configuration
* @return true if HA is configured in the configuration; else false.
*/
public static boolean isHAEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.RM_HA_ENABLED,
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
}
public static Collection<String> getRMHAIds(Configuration conf) {
return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
}
/**
* @param conf Configuration
* @return RM Id on success
* @throws YarnRuntimeException for configurations without a node id
*/
@VisibleForTesting
public static String getRMHAId(Configuration conf) {
String rmId = conf.get(YarnConfiguration.RM_HA_ID);
if (rmId == null) {
throwBadConfigurationException(YarnConfiguration.RM_HA_ID +
" needs to be set in a HA configuration");
}
return rmId;
}
private static String getConfValueForRMInstance(String prefix,
Configuration conf) {
String confKey = addSuffix(prefix, getRMHAId(conf));
String retVal = conf.get(confKey);
if (LOG.isTraceEnabled()) {
LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
"; confKey being looked up = " + confKey +
"; value being set to = " + retVal);
}
return retVal;
}
static String getConfValueForRMInstance(String prefix, String defaultValue,
Configuration conf) {
String value = getConfValueForRMInstance(prefix, conf);
return (value == null) ? defaultValue : value;
}
private static void setConfValue(String prefix, Configuration conf) {
conf.set(prefix, getConfValueForRMInstance(prefix, conf));
}
public static void setAllRpcAddresses(Configuration conf) {
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
setConfValue(confKey, conf);
}
}
/** Add non empty and non null suffix to a key */
@VisibleForTesting
public static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.isEmpty()) {
return key;
}
if (suffix.startsWith(".")) {
throw new IllegalArgumentException("suffix '" + suffix + "' should not " +
"already have '.' prepended.");
}
return key + "." + suffix;
}
}

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.yarn.conf;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@ -281,6 +278,8 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
public static final boolean DEFAULT_RM_HA_ENABLED = false;
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
////////////////////////////////
// RM state store configs
@ -854,4 +853,24 @@ public class YarnConfiguration extends Configuration {
this.reloadConfiguration();
}
}
/**
* Get the socket address for <code>name</code> property as a
* <code>InetSocketAddress</code>.
* @param name property name.
* @param defaultAddress the default value
* @param defaultPort the default port
* @return InetSocketAddress
*/
@Override
public InetSocketAddress getSocketAddr(
String name, String defaultAddress, int defaultPort) {
String address;
if (HAUtil.isHAEnabled(this)) {
address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
} else {
address = get(name, defaultAddress);
}
return NetUtils.createSocketAddr(address, defaultPort, name);
}
}

View File

@ -38,8 +38,11 @@ public class ClientRMProxy<T> extends RMProxy<T> {
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
public static <T> T createRMProxy(final Configuration conf,
public static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
InetSocketAddress rmAddress = getRMAddress(conf, protocol);
return createRMProxy(conf, protocol, rmAddress);
}
@ -60,7 +63,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
}
}
private static InetSocketAddress getRMAddress(Configuration conf,
private static InetSocketAddress getRMAddress(YarnConfiguration conf,
Class<?> protocol) throws IOException {
if (protocol == ApplicationClientProtocol.class) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,

View File

@ -320,13 +320,34 @@
</property>
<property>
<description>Enable RM high-availability. When enabled, the RM starts
in the Standby mode by default, and transitions to the Active mode when
prompted to.</description>
<description>Enable RM high-availability. When enabled,
(1) The RM starts in the Standby mode by default, and transitions to
the Active mode when prompted to.
(2) The nodes in the RM ensemble are listed in
yarn.resourcemanager.ha.rm-ids
(3) The id of each RM comes from yarn.resourcemanager.ha.id
(4) The actual physical addresses come from the configs of the pattern
- {rpc-config}.{id}</description>
<name>yarn.resourcemanager.ha.enabled</name>
<value>false</value>
</property>
<property>
<description>The list of RM nodes in the cluster when HA is
enabled. See description of yarn.resourcemanager.ha
.enabled for full details on how this is used.</description>
<name>yarn.resourcemanager.ha.rm-ids</name>
<!--value>rm1,rm2</value-->
</property>
<property>
<description>The id (string) of the current RM. When HA is enabled, this
is a required config. See description of yarn.resourcemanager.ha.enabled
for full details on how this is used.</description>
<name>yarn.resourcemanager.ha.id</name>
<!--value>rm1</value-->
</property>
<property>
<description>The maximum number of completed applications RM keeps. </description>
<name>yarn.resourcemanager.max-completed-applications</name>

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestHAUtil {
private Configuration conf;
private static final String RM1_ADDRESS = "1.2.3.4:8021";
private static final String RM2_ADDRESS = "localhost:8022";
private static final String RM1_NODE_ID = "rm1";
private static final String RM2_NODE_ID = "rm2";
@Before
public void setUp() {
conf = new Configuration();
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
}
}
@Test
public void testGetRMServiceId() throws Exception {
Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
assertEquals(2, rmhaIds.size());
}
@Test
public void testGetRMId() throws Exception {
assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
RM1_NODE_ID, HAUtil.getRMHAId(conf));
conf = new YarnConfiguration();
try {
HAUtil.getRMHAId(conf);
fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set");
} catch (YarnRuntimeException yre) {
// do nothing
}
}
@Test
public void testSetGetRpcAddresses() throws Exception {
HAUtil.setAllRpcAddresses(conf);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
assertEquals("RPC address not set for " + confKey,
RM1_ADDRESS, conf.get(confKey));
}
}
}

View File

@ -31,13 +31,17 @@ public class ServerRMProxy<T> extends RMProxy<T> {
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
public static <T> T createRMProxy(final Configuration conf,
public static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
InetSocketAddress rmAddress = getRMAddress(conf, protocol);
return createRMProxy(conf, protocol, rmAddress);
}
private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
private static InetSocketAddress getRMAddress(YarnConfiguration conf,
Class<?> protocol) {
if (protocol == ResourceTracker.class) {
return conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,

View File

@ -29,8 +29,8 @@ import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.conf.HAUtil;
import java.io.IOException;
@ -44,6 +44,7 @@ public class RMHAProtocolService extends AbstractService implements
private ResourceManager rm;
@VisibleForTesting
protected HAServiceState haState = HAServiceState.INITIALIZING;
private boolean haEnabled;
public RMHAProtocolService(ResourceManager resourceManager) {
super("RMHAProtocolService");
@ -51,17 +52,20 @@ public class RMHAProtocolService extends AbstractService implements
}
@Override
public synchronized void serviceInit(Configuration conf) throws Exception {
protected synchronized void serviceInit(Configuration conf) throws
Exception {
this.conf = conf;
haEnabled = HAUtil.isHAEnabled(this.conf);
if (haEnabled) {
HAUtil.setAllRpcAddresses(this.conf);
rm.setConf(this.conf);
}
rm.createAndInitActiveServices();
super.serviceInit(this.conf);
}
@Override
public synchronized void serviceStart() throws Exception {
boolean haEnabled = this.conf.getBoolean(YarnConfiguration.RM_HA_ENABLED,
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
protected synchronized void serviceStart() throws Exception {
if (haEnabled) {
transitionToStandby(true);
} else {
@ -72,7 +76,7 @@ public class RMHAProtocolService extends AbstractService implements
}
@Override
public synchronized void serviceStop() throws Exception {
protected synchronized void serviceStop() throws Exception {
transitionToStandby(false);
haState = HAServiceState.STOPPING;
super.serviceStop();

View File

@ -731,6 +731,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
webApp = builder.start(new RMWebApp(this));
}
void setConf(Configuration configuration) {
conf = configuration;
}
/**
* Helper method to create and init {@link #activeServices}. This creates an
* instance of {@link RMActiveServices} and initializes it.

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.junit.Before;
import org.junit.Test;
@ -42,10 +43,19 @@ public class TestRMHA {
private static final String STATE_ERR =
"ResourceManager is in wrong HA state";
private static final String RM1_ADDRESS = "0.0.0.0:0";
private static final String RM1_NODE_ID = "rm1";
@Before
public void setUp() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
}
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
rm = new MockRM(conf);
rm.init(conf);
}