YARN-1660. Simplified the RM HA configuration to accept and be able to simply depend just on
configuration properties of the form yarn.resourcemanager.hostname.RMID and use the default ports for all service addresses. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1565523 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
30294a2196
commit
8b2336fcef
|
@ -142,6 +142,11 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1665. Simplify the configuration of RM HA by having better default
|
||||
values. (Xuan Gong via vinodkv)
|
||||
|
||||
YARN-1660. Simplified the RM HA configuration to accept and be able to simply
|
||||
depend just on configuration properties of the form
|
||||
yarn.resourcemanager.hostname.RMID and use the default ports for all service
|
||||
addresses. (Xuan Gong via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -101,20 +101,7 @@ public class HAUtil {
|
|||
for (String id: ids) {
|
||||
// verify the RM service addresses configurations for every RMIds
|
||||
for (String prefix : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
|
||||
String confKey = null;
|
||||
try {
|
||||
confKey = addSuffix(prefix, id);
|
||||
if (conf.getTrimmed(confKey) == null) {
|
||||
throwBadConfigurationException(getNeedToSetValueMessage(confKey));
|
||||
}
|
||||
} catch (IllegalArgumentException iae) {
|
||||
String errmsg = iae.getMessage();
|
||||
if (confKey == null) {
|
||||
// Error at addSuffix
|
||||
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID, id);
|
||||
}
|
||||
throwBadConfigurationException(errmsg);
|
||||
}
|
||||
checkAndSetRMRPCAddress(prefix, id, conf);
|
||||
}
|
||||
setValue.append(id);
|
||||
setValue.append(",");
|
||||
|
@ -249,9 +236,13 @@ public class HAUtil {
|
|||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
static String getConfKeyForRMInstance(String prefix, Configuration conf) {
|
||||
return YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS.contains(prefix)
|
||||
? addSuffix(prefix, getRMHAId(conf))
|
||||
: prefix;
|
||||
if (!YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS.contains(prefix)) {
|
||||
return prefix;
|
||||
} else {
|
||||
String RMId = getRMHAId(conf);
|
||||
checkAndSetRMRPCAddress(prefix, RMId, conf);
|
||||
return addSuffix(prefix, RMId);
|
||||
}
|
||||
}
|
||||
|
||||
public static String getConfValueForRMInstance(String prefix,
|
||||
|
@ -284,4 +275,30 @@ public class HAUtil {
|
|||
}
|
||||
return key + "." + suffix;
|
||||
}
|
||||
|
||||
private static void checkAndSetRMRPCAddress(String prefix, String RMId,
|
||||
Configuration conf) {
|
||||
String rpcAddressConfKey = null;
|
||||
try {
|
||||
rpcAddressConfKey = addSuffix(prefix, RMId);
|
||||
if (conf.getTrimmed(rpcAddressConfKey) == null) {
|
||||
String hostNameConfKey = addSuffix(YarnConfiguration.RM_HOSTNAME, RMId);
|
||||
String confVal = conf.getTrimmed(hostNameConfKey);
|
||||
if (confVal == null) {
|
||||
throwBadConfigurationException(getNeedToSetValueMessage(
|
||||
hostNameConfKey + " or " + addSuffix(prefix, RMId)));
|
||||
} else {
|
||||
conf.set(addSuffix(prefix, RMId), confVal + ":"
|
||||
+ YarnConfiguration.getRMDefaultPortNumber(prefix));
|
||||
}
|
||||
}
|
||||
} catch (IllegalArgumentException iae) {
|
||||
String errmsg = iae.getMessage();
|
||||
if (rpcAddressConfKey == null) {
|
||||
// Error at addSuffix
|
||||
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID, RMId);
|
||||
}
|
||||
throwBadConfigurationException(errmsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -108,6 +109,8 @@ public class YarnConfiguration extends Configuration {
|
|||
|
||||
public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
|
||||
|
||||
public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
|
||||
|
||||
/** The address of the applications manager interface in the RM.*/
|
||||
public static final String RM_ADDRESS =
|
||||
RM_PREFIX + "address";
|
||||
|
@ -1139,4 +1142,27 @@ public class YarnConfiguration extends Configuration {
|
|||
}
|
||||
return super.updateConnectAddr(prefix, addr);
|
||||
}
|
||||
|
||||
@Private
|
||||
public static int getRMDefaultPortNumber(String addressPrefix) {
|
||||
if (addressPrefix.equals(YarnConfiguration.RM_ADDRESS)) {
|
||||
return YarnConfiguration.DEFAULT_RM_PORT;
|
||||
} else if (addressPrefix.equals(YarnConfiguration.RM_SCHEDULER_ADDRESS)) {
|
||||
return YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT;
|
||||
} else if (addressPrefix.equals(YarnConfiguration.RM_WEBAPP_ADDRESS)) {
|
||||
return YarnConfiguration.DEFAULT_RM_WEBAPP_PORT;
|
||||
} else if (addressPrefix.equals(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS)) {
|
||||
return YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT;
|
||||
} else if (addressPrefix
|
||||
.equals(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS)) {
|
||||
return YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT;
|
||||
} else if (addressPrefix.equals(YarnConfiguration.RM_ADMIN_ADDRESS)) {
|
||||
return YarnConfiguration.DEFAULT_RM_ADMIN_PORT;
|
||||
} else {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid RM RPC address Prefix: " + addressPrefix
|
||||
+ ". The valid value should be one of "
|
||||
+ YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -159,8 +159,9 @@ public class TestHAUtil {
|
|||
String confKey =
|
||||
HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID);
|
||||
assertEquals("YarnRuntimeException by Configuration#set()",
|
||||
HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(confKey),
|
||||
e.getMessage());
|
||||
HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(
|
||||
HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, RM1_NODE_ID)
|
||||
+ " or " + confKey), e.getMessage());
|
||||
}
|
||||
|
||||
// simulate the case YarnConfiguration.RM_HA_IDS doesn't contain
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -314,6 +315,57 @@ public class TestRMHA {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHAWithRMHostName() {
|
||||
//test if both RM_HOSTBANE_{rm_id} and RM_RPCADDRESS_{rm_id} are set
|
||||
//We should only read rpc addresses from RM_RPCADDRESS_{rm_id} configuration
|
||||
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
|
||||
RM1_NODE_ID), "1.1.1.1");
|
||||
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
|
||||
RM2_NODE_ID), "0.0.0.0");
|
||||
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
|
||||
RM3_NODE_ID), "2.2.2.2");
|
||||
try {
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
rm = new MockRM(conf);
|
||||
rm.init(conf);
|
||||
for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
|
||||
assertEquals("RPC address not set for " + confKey,
|
||||
RM1_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM1_NODE_ID)));
|
||||
assertEquals("RPC address not set for " + confKey,
|
||||
RM2_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM2_NODE_ID)));
|
||||
assertEquals("RPC address not set for " + confKey,
|
||||
RM3_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM3_NODE_ID)));
|
||||
}
|
||||
} catch (YarnRuntimeException e) {
|
||||
fail("Should not throw any exceptions.");
|
||||
}
|
||||
|
||||
//test if only RM_HOSTBANE_{rm_id} is set
|
||||
configuration.clear();
|
||||
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
|
||||
+ RM2_NODE_ID);
|
||||
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
|
||||
RM1_NODE_ID), "1.1.1.1");
|
||||
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
|
||||
RM2_NODE_ID), "0.0.0.0");
|
||||
try {
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
rm = new MockRM(conf);
|
||||
rm.init(conf);
|
||||
assertEquals("RPC address not set for " + YarnConfiguration.RM_ADDRESS,
|
||||
"1.1.1.1:8032",
|
||||
conf.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
|
||||
assertEquals("RPC address not set for " + YarnConfiguration.RM_ADDRESS,
|
||||
"0.0.0.0:8032",
|
||||
conf.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
|
||||
|
||||
} catch (YarnRuntimeException e) {
|
||||
fail("Should not throw any exceptions.");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
class MyCountingDispatcher extends AbstractService implements Dispatcher {
|
||||
|
||||
|
|
Loading…
Reference in New Issue