Merge r1534884 from trunk to branch-2 YARN-1305. RMHAProtocolService#serviceInit should handle HAUtil's IllegalArgumentException (Tsuyoshi Ozawa via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1534887 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6d2ef587d0
commit
7f27c99a44
|
@ -67,6 +67,9 @@ Release 2.3.0 - UNRELEASED
|
|||
YARN-1183. MiniYARNCluster shutdown takes several minutes intermittently
|
||||
(Andrey Klochkov via jeagles)
|
||||
|
||||
YARN-1305. RMHAProtocolService#serviceInit should handle HAUtil's
|
||||
IllegalArgumentException (Tsuyoshi Ozawa via bikas)
|
||||
|
||||
Release 2.2.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -42,10 +43,13 @@ public class HAUtil {
|
|||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||
YarnConfiguration.RM_WEBAPP_ADDRESS));
|
||||
|
||||
public static final String BAD_CONFIG_MESSAGE_PREFIX =
|
||||
"Invalid configuration! ";
|
||||
|
||||
private HAUtil() { /* Hidden constructor */ }
|
||||
|
||||
private static void throwBadConfigurationException(String msg) {
|
||||
throw new YarnRuntimeException("Invalid configuration! " + msg);
|
||||
throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -59,29 +63,137 @@ public class HAUtil {
|
|||
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
|
||||
}
|
||||
|
||||
public static Collection<String> getRMHAIds(Configuration conf) {
|
||||
return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
|
||||
/**
|
||||
* Verify configuration for Resource Manager HA.
|
||||
* @param conf Configuration
|
||||
* @throws YarnRuntimeException
|
||||
*/
|
||||
public static void verifyAndSetConfiguration(Configuration conf)
|
||||
throws YarnRuntimeException {
|
||||
verifyAndSetRMHAIds(conf);
|
||||
verifyAndSetRMHAId(conf);
|
||||
verifyAndSetAllRpcAddresses(conf);
|
||||
}
|
||||
|
||||
|
||||
private static void verifyAndSetRMHAIds(Configuration conf) {
|
||||
Collection<String> ids =
|
||||
conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
|
||||
if (ids.size() <= 0) {
|
||||
throwBadConfigurationException(
|
||||
getInvalidValueMessage(YarnConfiguration.RM_HA_IDS,
|
||||
conf.get(YarnConfiguration.RM_HA_IDS)));
|
||||
} else if (ids.size() == 1) {
|
||||
LOG.warn(getRMHAIdsWarningMessage(ids.toString()));
|
||||
}
|
||||
|
||||
StringBuilder setValue = new StringBuilder();
|
||||
for (String id: ids) {
|
||||
setValue.append(id);
|
||||
setValue.append(",");
|
||||
}
|
||||
conf.set(YarnConfiguration.RM_HA_IDS,
|
||||
setValue.substring(0, setValue.length() - 1));
|
||||
}
|
||||
|
||||
private static void verifyAndSetRMHAId(Configuration conf) {
|
||||
String rmId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
|
||||
if (rmId == null) {
|
||||
throwBadConfigurationException(
|
||||
getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID));
|
||||
} else {
|
||||
Collection<String> ids = getRMHAIds(conf);
|
||||
if (!ids.contains(rmId)) {
|
||||
throwBadConfigurationException(
|
||||
getRMHAIdNeedToBeIncludedMessage(ids.toString(), rmId));
|
||||
}
|
||||
}
|
||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||
}
|
||||
|
||||
private static void verifyAndSetConfValue(String prefix, Configuration conf) {
|
||||
String confKey = null;
|
||||
String confValue = null;
|
||||
try {
|
||||
confKey = getConfKeyForRMInstance(prefix, conf);
|
||||
confValue = getConfValueForRMInstance(prefix, conf);
|
||||
conf.set(prefix, confValue);
|
||||
} catch (YarnRuntimeException yre) {
|
||||
// Error at getRMHAId()
|
||||
throw yre;
|
||||
} catch (IllegalArgumentException iae) {
|
||||
String errmsg;
|
||||
if (confKey == null) {
|
||||
// Error at addSuffix
|
||||
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
|
||||
getRMHAId(conf));
|
||||
} else {
|
||||
// Error at Configuration#set.
|
||||
errmsg = getNeedToSetValueMessage(confKey);
|
||||
}
|
||||
throwBadConfigurationException(errmsg);
|
||||
}
|
||||
}
|
||||
|
||||
public static void verifyAndSetAllRpcAddresses(Configuration conf) {
|
||||
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
|
||||
verifyAndSetConfValue(confKey, conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf Configuration
|
||||
* @param conf Configuration. Please use getRMHAIds to check.
|
||||
* @return RM Ids on success
|
||||
*/
|
||||
public static Collection<String> getRMHAIds(Configuration conf) {
|
||||
return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf Configuration. Please use verifyAndSetRMHAId to check.
|
||||
* @return RM Id on success
|
||||
* @throws YarnRuntimeException for configurations without a node id
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static String getRMHAId(Configuration conf) {
|
||||
String rmId = conf.get(YarnConfiguration.RM_HA_ID);
|
||||
if (rmId == null) {
|
||||
throwBadConfigurationException(YarnConfiguration.RM_HA_ID +
|
||||
" needs to be set in a HA configuration");
|
||||
}
|
||||
return rmId;
|
||||
static String getRMHAId(Configuration conf) {
|
||||
return conf.get(YarnConfiguration.RM_HA_ID);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getNeedToSetValueMessage(String confKey) {
|
||||
return confKey + " needs to be set in a HA configuration.";
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getInvalidValueMessage(String confKey,
|
||||
String invalidValue){
|
||||
return "Invalid value of " + confKey +". "
|
||||
+ "Current value is " + invalidValue;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getRMHAIdNeedToBeIncludedMessage(String ids,
|
||||
String rmId) {
|
||||
return YarnConfiguration.RM_HA_IDS + "("
|
||||
+ ids + ") need to contain " + YarnConfiguration.RM_HA_ID + "("
|
||||
+ rmId + ") in a HA configuration.";
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getRMHAIdsWarningMessage(String ids) {
|
||||
return "Resource Manager HA is enabled, but " +
|
||||
YarnConfiguration.RM_HA_IDS + " has only one id(" +
|
||||
ids.toString() + ")";
|
||||
}
|
||||
|
||||
private static String getConfKeyForRMInstance(String prefix,
|
||||
Configuration conf) {
|
||||
return addSuffix(prefix, getRMHAId(conf));
|
||||
}
|
||||
|
||||
private static String getConfValueForRMInstance(String prefix,
|
||||
Configuration conf) {
|
||||
String confKey = addSuffix(prefix, getRMHAId(conf));
|
||||
String retVal = conf.get(confKey);
|
||||
String confKey = getConfKeyForRMInstance(prefix, conf);
|
||||
String retVal = conf.getTrimmed(confKey);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
|
||||
"; confKey being looked up = " + confKey +
|
||||
|
@ -96,16 +208,6 @@ public class HAUtil {
|
|||
return (value == null) ? defaultValue : value;
|
||||
}
|
||||
|
||||
private static void setConfValue(String prefix, Configuration conf) {
|
||||
conf.set(prefix, getConfValueForRMInstance(prefix, conf));
|
||||
}
|
||||
|
||||
public static void setAllRpcAddresses(Configuration conf) {
|
||||
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
|
||||
setConfValue(confKey, conf);
|
||||
}
|
||||
}
|
||||
|
||||
/** Add non empty and non null suffix to a key */
|
||||
@VisibleForTesting
|
||||
public static String addSuffix(String key, String suffix) {
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.conf;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -27,53 +28,134 @@ import org.junit.Test;
|
|||
import java.util.Collection;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestHAUtil {
|
||||
private Configuration conf;
|
||||
|
||||
private static final String RM1_ADDRESS = "1.2.3.4:8021";
|
||||
private static final String RM1_ADDRESS_UNTRIMMED = " \t\t\n 1.2.3.4:8021 \n\t ";
|
||||
private static final String RM1_ADDRESS = RM1_ADDRESS_UNTRIMMED.trim();
|
||||
private static final String RM2_ADDRESS = "localhost:8022";
|
||||
private static final String RM1_NODE_ID = "rm1";
|
||||
private static final String RM1_NODE_ID_UNTRIMMED = "rm1 ";
|
||||
private static final String RM1_NODE_ID = RM1_NODE_ID_UNTRIMMED.trim();
|
||||
private static final String RM2_NODE_ID = "rm2";
|
||||
private static final String RM3_NODE_ID = "rm3";
|
||||
private static final String RM_INVALID_NODE_ID = ".rm";
|
||||
private static final String RM_NODE_IDS_UNTRIMMED = RM1_NODE_ID_UNTRIMMED + "," + RM2_NODE_ID;
|
||||
private static final String RM_NODE_IDS = RM1_NODE_ID + "," + RM2_NODE_ID;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
||||
|
||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
|
||||
// configuration key itself cannot contains space/tab/return chars.
|
||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
||||
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRMServiceId() throws Exception {
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||
Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
|
||||
assertEquals(2, rmhaIds.size());
|
||||
|
||||
String[] ids = rmhaIds.toArray(new String[0]);
|
||||
assertEquals(RM1_NODE_ID, ids[0]);
|
||||
assertEquals(RM2_NODE_ID, ids[1]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRMId() throws Exception {
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||
assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
|
||||
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
||||
conf = new YarnConfiguration();
|
||||
try {
|
||||
HAUtil.getRMHAId(conf);
|
||||
fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set");
|
||||
} catch (YarnRuntimeException yre) {
|
||||
// do nothing
|
||||
}
|
||||
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
||||
|
||||
conf.clear();
|
||||
assertNull("Return null when " + YarnConfiguration.RM_HA_ID
|
||||
+ " is not set", HAUtil.getRMHAId(conf));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetGetRpcAddresses() throws Exception {
|
||||
HAUtil.setAllRpcAddresses(conf);
|
||||
public void testVerifyAndSetConfiguration() throws Exception {
|
||||
try {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
} catch (YarnRuntimeException e) {
|
||||
fail("Should not throw any exceptions.");
|
||||
}
|
||||
|
||||
assertEquals("Should be saved as Trimmed collection",
|
||||
StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
|
||||
assertEquals("Should be saved as Trimmed string",
|
||||
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||
assertEquals("RPC address not set for " + confKey,
|
||||
RM1_ADDRESS, conf.get(confKey));
|
||||
RM1_ADDRESS, conf.get(confKey));
|
||||
}
|
||||
|
||||
conf.clear();
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
|
||||
try {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
} catch (YarnRuntimeException e) {
|
||||
assertEquals("YarnRuntimeException by getRMId()",
|
||||
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
|
||||
HAUtil.getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID),
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
conf.clear();
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
|
||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||
// simulate xml with invalid node id
|
||||
conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
|
||||
}
|
||||
try {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
} catch (YarnRuntimeException e) {
|
||||
assertEquals("YarnRuntimeException by addSuffix()",
|
||||
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
|
||||
HAUtil.getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
|
||||
RM_INVALID_NODE_ID),
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
conf.clear();
|
||||
// simulate the case HAUtil.RPC_ADDRESS_CONF_KEYS are not set
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
|
||||
try {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
fail("Should throw YarnRuntimeException. by Configuration#set()");
|
||||
} catch (YarnRuntimeException e) {
|
||||
String confKey =
|
||||
HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID);
|
||||
assertEquals("YarnRuntimeException by Configuration#set()",
|
||||
HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(confKey),
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
// simulate the case YarnConfiguration.RM_HA_IDS doesn't contain
|
||||
// the value of YarnConfiguration.RM_HA_ID
|
||||
conf.clear();
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
|
||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
||||
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||
}
|
||||
try {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
} catch (YarnRuntimeException e) {
|
||||
assertEquals("YarnRuntimeException by getRMId()'s validation",
|
||||
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
|
||||
HAUtil.getRMHAIdNeedToBeIncludedMessage("[rm2, rm3]", RM1_NODE_ID),
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
this.conf = conf;
|
||||
haEnabled = HAUtil.isHAEnabled(this.conf);
|
||||
if (haEnabled) {
|
||||
HAUtil.setAllRpcAddresses(this.conf);
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
rm.setConf(this.conf);
|
||||
}
|
||||
rm.createAndInitActiveServices();
|
||||
|
|
Loading…
Reference in New Issue