Merge r1529251 from trunk to branch-2 for YARN-1232. Configuration to support multiple RMs (Karthik Kambatla via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1529285 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
856907b8db
commit
449f3b9457
|
@ -33,6 +33,9 @@ Release 2.3.0 - UNRELEASED
|
|||
|
||||
YARN-1199. Make NM/RM Versions Available (Mit Desai via jeagles)
|
||||
|
||||
YARN-1232. Configuration to support multiple RMs (Karthik Kambatla via
|
||||
bikas)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.conf;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class HAUtil {
|
||||
private static Log LOG = LogFactory.getLog(HAUtil.class);
|
||||
|
||||
public static final List<String> RPC_ADDRESS_CONF_KEYS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||
YarnConfiguration.RM_WEBAPP_ADDRESS));
|
||||
|
||||
private HAUtil() { /* Hidden constructor */ }
|
||||
|
||||
private static void throwBadConfigurationException(String msg) {
|
||||
throw new YarnRuntimeException("Invalid configuration! " + msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if Resource Manager HA is configured.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return true if HA is configured in the configuration; else false.
|
||||
*/
|
||||
public static boolean isHAEnabled(Configuration conf) {
|
||||
return conf.getBoolean(YarnConfiguration.RM_HA_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
|
||||
}
|
||||
|
||||
public static Collection<String> getRMHAIds(Configuration conf) {
|
||||
return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf Configuration
|
||||
* @return RM Id on success
|
||||
* @throws YarnRuntimeException for configurations without a node id
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static String getRMHAId(Configuration conf) {
|
||||
String rmId = conf.get(YarnConfiguration.RM_HA_ID);
|
||||
if (rmId == null) {
|
||||
throwBadConfigurationException(YarnConfiguration.RM_HA_ID +
|
||||
" needs to be set in a HA configuration");
|
||||
}
|
||||
return rmId;
|
||||
}
|
||||
|
||||
private static String getConfValueForRMInstance(String prefix,
|
||||
Configuration conf) {
|
||||
String confKey = addSuffix(prefix, getRMHAId(conf));
|
||||
String retVal = conf.get(confKey);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
|
||||
"; confKey being looked up = " + confKey +
|
||||
"; value being set to = " + retVal);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
static String getConfValueForRMInstance(String prefix, String defaultValue,
|
||||
Configuration conf) {
|
||||
String value = getConfValueForRMInstance(prefix, conf);
|
||||
return (value == null) ? defaultValue : value;
|
||||
}
|
||||
|
||||
private static void setConfValue(String prefix, Configuration conf) {
|
||||
conf.set(prefix, getConfValueForRMInstance(prefix, conf));
|
||||
}
|
||||
|
||||
public static void setAllRpcAddresses(Configuration conf) {
|
||||
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
|
||||
setConfValue(confKey, conf);
|
||||
}
|
||||
}
|
||||
|
||||
/** Add non empty and non null suffix to a key */
|
||||
@VisibleForTesting
|
||||
public static String addSuffix(String key, String suffix) {
|
||||
if (suffix == null || suffix.isEmpty()) {
|
||||
return key;
|
||||
}
|
||||
if (suffix.startsWith(".")) {
|
||||
throw new IllegalArgumentException("suffix '" + suffix + "' should not " +
|
||||
"already have '.' prepended.");
|
||||
}
|
||||
return key + "." + suffix;
|
||||
}
|
||||
}
|
|
@ -18,15 +18,12 @@
|
|||
|
||||
package org.apache.hadoop.yarn.conf;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
|
@ -281,6 +278,8 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
|
||||
public static final boolean DEFAULT_RM_HA_ENABLED = false;
|
||||
|
||||
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
|
||||
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
|
||||
|
||||
////////////////////////////////
|
||||
// RM state store configs
|
||||
|
@ -854,4 +853,24 @@ public class YarnConfiguration extends Configuration {
|
|||
this.reloadConfiguration();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the socket address for <code>name</code> property as a
|
||||
* <code>InetSocketAddress</code>.
|
||||
* @param name property name.
|
||||
* @param defaultAddress the default value
|
||||
* @param defaultPort the default port
|
||||
* @return InetSocketAddress
|
||||
*/
|
||||
@Override
|
||||
public InetSocketAddress getSocketAddr(
|
||||
String name, String defaultAddress, int defaultPort) {
|
||||
String address;
|
||||
if (HAUtil.isHAEnabled(this)) {
|
||||
address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
|
||||
} else {
|
||||
address = get(name, defaultAddress);
|
||||
}
|
||||
return NetUtils.createSocketAddr(address, defaultPort, name);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,8 +38,11 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
|
||||
|
||||
public static <T> T createRMProxy(final Configuration conf,
|
||||
public static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol) throws IOException {
|
||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||
? (YarnConfiguration) configuration
|
||||
: new YarnConfiguration(configuration);
|
||||
InetSocketAddress rmAddress = getRMAddress(conf, protocol);
|
||||
return createRMProxy(conf, protocol, rmAddress);
|
||||
}
|
||||
|
@ -60,7 +63,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
}
|
||||
}
|
||||
|
||||
private static InetSocketAddress getRMAddress(Configuration conf,
|
||||
private static InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||
Class<?> protocol) throws IOException {
|
||||
if (protocol == ApplicationClientProtocol.class) {
|
||||
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||
|
|
|
@ -320,13 +320,34 @@
|
|||
</property>
|
||||
|
||||
<property>
|
||||
<description>Enable RM high-availability. When enabled, the RM starts
|
||||
in the Standby mode by default, and transitions to the Active mode when
|
||||
prompted to.</description>
|
||||
<description>Enable RM high-availability. When enabled,
|
||||
(1) The RM starts in the Standby mode by default, and transitions to
|
||||
the Active mode when prompted to.
|
||||
(2) The nodes in the RM ensemble are listed in
|
||||
yarn.resourcemanager.ha.rm-ids
|
||||
(3) The id of each RM comes from yarn.resourcemanager.ha.id
|
||||
(4) The actual physical addresses come from the configs of the pattern
|
||||
- {rpc-config}.{id}</description>
|
||||
<name>yarn.resourcemanager.ha.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The list of RM nodes in the cluster when HA is
|
||||
enabled. See description of yarn.resourcemanager.ha
|
||||
.enabled for full details on how this is used.</description>
|
||||
<name>yarn.resourcemanager.ha.rm-ids</name>
|
||||
<!--value>rm1,rm2</value-->
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The id (string) of the current RM. When HA is enabled, this
|
||||
is a required config. See description of yarn.resourcemanager.ha.enabled
|
||||
for full details on how this is used.</description>
|
||||
<name>yarn.resourcemanager.ha.id</name>
|
||||
<!--value>rm1</value-->
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The maximum number of completed applications RM keeps. </description>
|
||||
<name>yarn.resourcemanager.max-completed-applications</name>
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.conf;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestHAUtil {
|
||||
private Configuration conf;
|
||||
|
||||
private static final String RM1_ADDRESS = "1.2.3.4:8021";
|
||||
private static final String RM2_ADDRESS = "localhost:8022";
|
||||
private static final String RM1_NODE_ID = "rm1";
|
||||
private static final String RM2_NODE_ID = "rm2";
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||
|
||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
|
||||
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRMServiceId() throws Exception {
|
||||
Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
|
||||
assertEquals(2, rmhaIds.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRMId() throws Exception {
|
||||
assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
|
||||
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
||||
conf = new YarnConfiguration();
|
||||
try {
|
||||
HAUtil.getRMHAId(conf);
|
||||
fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set");
|
||||
} catch (YarnRuntimeException yre) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetGetRpcAddresses() throws Exception {
|
||||
HAUtil.setAllRpcAddresses(conf);
|
||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||
assertEquals("RPC address not set for " + confKey,
|
||||
RM1_ADDRESS, conf.get(confKey));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,13 +31,17 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
|
||||
|
||||
public static <T> T createRMProxy(final Configuration conf,
|
||||
public static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol) throws IOException {
|
||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||
? (YarnConfiguration) configuration
|
||||
: new YarnConfiguration(configuration);
|
||||
InetSocketAddress rmAddress = getRMAddress(conf, protocol);
|
||||
return createRMProxy(conf, protocol, rmAddress);
|
||||
}
|
||||
|
||||
private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
|
||||
private static InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||
Class<?> protocol) {
|
||||
if (protocol == ResourceTracker.class) {
|
||||
return conf.getSocketAddr(
|
||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||
|
|
|
@ -29,8 +29,8 @@ import org.apache.hadoop.ha.HAServiceProtocol;
|
|||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -44,6 +44,7 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
private ResourceManager rm;
|
||||
@VisibleForTesting
|
||||
protected HAServiceState haState = HAServiceState.INITIALIZING;
|
||||
private boolean haEnabled;
|
||||
|
||||
public RMHAProtocolService(ResourceManager resourceManager) {
|
||||
super("RMHAProtocolService");
|
||||
|
@ -51,17 +52,20 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
protected synchronized void serviceInit(Configuration conf) throws
|
||||
Exception {
|
||||
this.conf = conf;
|
||||
haEnabled = HAUtil.isHAEnabled(this.conf);
|
||||
if (haEnabled) {
|
||||
HAUtil.setAllRpcAddresses(this.conf);
|
||||
rm.setConf(this.conf);
|
||||
}
|
||||
rm.createAndInitActiveServices();
|
||||
super.serviceInit(this.conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void serviceStart() throws Exception {
|
||||
boolean haEnabled = this.conf.getBoolean(YarnConfiguration.RM_HA_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
|
||||
|
||||
protected synchronized void serviceStart() throws Exception {
|
||||
if (haEnabled) {
|
||||
transitionToStandby(true);
|
||||
} else {
|
||||
|
@ -72,7 +76,7 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void serviceStop() throws Exception {
|
||||
protected synchronized void serviceStop() throws Exception {
|
||||
transitionToStandby(false);
|
||||
haState = HAServiceState.STOPPING;
|
||||
super.serviceStop();
|
||||
|
|
|
@ -731,6 +731,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
webApp = builder.start(new RMWebApp(this));
|
||||
}
|
||||
|
||||
void setConf(Configuration configuration) {
|
||||
conf = configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create and init {@link #activeServices}. This creates an
|
||||
* instance of {@link RMActiveServices} and initializes it.
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -42,10 +43,19 @@ public class TestRMHA {
|
|||
private static final String STATE_ERR =
|
||||
"ResourceManager is in wrong HA state";
|
||||
|
||||
private static final String RM1_ADDRESS = "0.0.0.0:0";
|
||||
private static final String RM1_NODE_ID = "rm1";
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
|
||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
|
||||
}
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||
|
||||
rm = new MockRM(conf);
|
||||
rm.init(conf);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue