YARN-1666. Modified RM HA handling of include/exclude node-lists to be available across RM failover by making using of a remote configuration-provider. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1569856 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
996acc834e
commit
9da9f7d4d8
|
@ -25,6 +25,7 @@ import java.util.HashSet;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
// Keeps track of which datanodes/tasktrackers are allowed to connect to the
|
||||
|
@ -48,13 +49,30 @@ public class HostsFileReader {
|
|||
refresh();
|
||||
}
|
||||
|
||||
@Private
|
||||
public HostsFileReader(String includesFile, InputStream inFileInputStream,
|
||||
String excludesFile, InputStream exFileInputStream) throws IOException {
|
||||
includes = new HashSet<String>();
|
||||
excludes = new HashSet<String>();
|
||||
this.includesFile = includesFile;
|
||||
this.excludesFile = excludesFile;
|
||||
refresh(inFileInputStream, exFileInputStream);
|
||||
}
|
||||
|
||||
public static void readFileToSet(String type,
|
||||
String filename, Set<String> set) throws IOException {
|
||||
File file = new File(filename);
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
readFileToSetWithFileInputStream(type, filename, fis, set);
|
||||
}
|
||||
|
||||
@Private
|
||||
public static void readFileToSetWithFileInputStream(String type,
|
||||
String filename, InputStream fileInputStream, Set<String> set)
|
||||
throws IOException {
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
reader = new BufferedReader(new InputStreamReader(fis));
|
||||
reader = new BufferedReader(new InputStreamReader(fileInputStream));
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
String[] nodes = line.split("[ \t\n\f\r]+");
|
||||
|
@ -71,26 +89,63 @@ public class HostsFileReader {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
fis.close();
|
||||
}
|
||||
fileInputStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void refresh() throws IOException {
|
||||
LOG.info("Refreshing hosts (include/exclude) list");
|
||||
Set<String> newIncludes = new HashSet<String>();
|
||||
Set<String> newExcludes = new HashSet<String>();
|
||||
boolean switchIncludes = false;
|
||||
boolean switchExcludes = false;
|
||||
if (!includesFile.isEmpty()) {
|
||||
Set<String> newIncludes = new HashSet<String>();
|
||||
readFileToSet("included", includesFile, newIncludes);
|
||||
switchIncludes = true;
|
||||
}
|
||||
if (!excludesFile.isEmpty()) {
|
||||
readFileToSet("excluded", excludesFile, newExcludes);
|
||||
switchExcludes = true;
|
||||
}
|
||||
|
||||
if (switchIncludes) {
|
||||
// switch the new hosts that are to be included
|
||||
includes = newIncludes;
|
||||
}
|
||||
if (!excludesFile.isEmpty()) {
|
||||
Set<String> newExcludes = new HashSet<String>();
|
||||
readFileToSet("excluded", excludesFile, newExcludes);
|
||||
if (switchExcludes) {
|
||||
// switch the excluded hosts
|
||||
excludes = newExcludes;
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
public synchronized void refresh(InputStream inFileInputStream,
|
||||
InputStream exFileInputStream) throws IOException {
|
||||
LOG.info("Refreshing hosts (include/exclude) list");
|
||||
Set<String> newIncludes = new HashSet<String>();
|
||||
Set<String> newExcludes = new HashSet<String>();
|
||||
boolean switchIncludes = false;
|
||||
boolean switchExcludes = false;
|
||||
if (inFileInputStream != null) {
|
||||
readFileToSetWithFileInputStream("included", includesFile,
|
||||
inFileInputStream, newIncludes);
|
||||
switchIncludes = true;
|
||||
}
|
||||
if (exFileInputStream != null) {
|
||||
readFileToSetWithFileInputStream("excluded", excludesFile,
|
||||
exFileInputStream, newExcludes);
|
||||
switchExcludes = true;
|
||||
}
|
||||
if (switchIncludes) {
|
||||
// switch the new hosts that are to be included
|
||||
includes = newIncludes;
|
||||
}
|
||||
if (switchExcludes) {
|
||||
// switch the excluded hosts
|
||||
excludes = newExcludes;
|
||||
}
|
||||
|
|
|
@ -202,6 +202,10 @@ Release 2.4.0 - UNRELEASED
|
|||
be available across RM failover by making using of a remote
|
||||
configuration-provider. (Xuan Gong via vinodkv)
|
||||
|
||||
YARN-1666. Modified RM HA handling of include/exclude node-lists to be
|
||||
available across RM failover by making using of a remote
|
||||
configuration-provider. (Xuan Gong via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.yarn.conf;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -42,15 +44,16 @@ public abstract class ConfigurationProvider {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the configuration and combine with bootstrapConf
|
||||
* Opens an InputStream at the indicated file
|
||||
* @param bootstrapConf Configuration
|
||||
* @param name The configuration file name
|
||||
* @return configuration
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Configuration getConfiguration(Configuration bootstrapConf,
|
||||
String name) throws YarnException, IOException;
|
||||
public abstract InputStream getConfigurationInputStream(
|
||||
Configuration bootstrapConf, String name) throws YarnException,
|
||||
IOException;
|
||||
|
||||
/**
|
||||
* Derived classes initialize themselves using this method.
|
||||
|
|
|
@ -45,22 +45,31 @@ public class YarnConfiguration extends Configuration {
|
|||
"hadoop-policy.xml";
|
||||
|
||||
@Private
|
||||
public static final String YARN_SITE_XML_FILE = "yarn-site.xml";
|
||||
public static final String YARN_SITE_CONFIGURATION_FILE = "yarn-site.xml";
|
||||
|
||||
private static final String YARN_DEFAULT_CONFIGURATION_FILE =
|
||||
"yarn-default.xml";
|
||||
|
||||
@Private
|
||||
public static final String CORE_SITE_CONFIGURATION_FILE = "core-site.xml";
|
||||
|
||||
@Private
|
||||
public static final List<String> RM_CONFIGURATION_FILES =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
CS_CONFIGURATION_FILE,
|
||||
HADOOP_POLICY_CONFIGURATION_FILE,
|
||||
YARN_SITE_CONFIGURATION_FILE,
|
||||
CORE_SITE_CONFIGURATION_FILE));
|
||||
|
||||
@Evolving
|
||||
public static final int APPLICATION_MAX_TAGS = 10;
|
||||
|
||||
@Evolving
|
||||
public static final int APPLICATION_MAX_TAG_LENGTH = 100;
|
||||
|
||||
private static final String YARN_DEFAULT_XML_FILE = "yarn-default.xml";
|
||||
|
||||
static {
|
||||
Configuration.addDefaultResource(YARN_DEFAULT_XML_FILE);
|
||||
Configuration.addDefaultResource(YARN_SITE_XML_FILE);
|
||||
Configuration.addDefaultResource(YARN_DEFAULT_CONFIGURATION_FILE);
|
||||
Configuration.addDefaultResource(YARN_SITE_CONFIGURATION_FILE);
|
||||
}
|
||||
|
||||
//Configurations
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.yarn;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -41,15 +43,27 @@ public class FileSystemBasedConfigurationProvider
|
|||
private Path configDir;
|
||||
|
||||
@Override
|
||||
public synchronized Configuration getConfiguration(Configuration bootstrapConf,
|
||||
String name) throws IOException, YarnException {
|
||||
Path configPath = new Path(this.configDir, name);
|
||||
if (!fs.exists(configPath)) {
|
||||
throw new YarnException("Can not find Configuration: " + name + " in "
|
||||
+ configDir);
|
||||
public synchronized InputStream getConfigurationInputStream(
|
||||
Configuration bootstrapConf, String name) throws IOException,
|
||||
YarnException {
|
||||
if (name == null || name.isEmpty()) {
|
||||
throw new YarnException(
|
||||
"Illegal argument! The parameter should not be null or empty");
|
||||
}
|
||||
bootstrapConf.addResource(fs.open(configPath));
|
||||
return bootstrapConf;
|
||||
Path filePath;
|
||||
if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name)) {
|
||||
filePath = new Path(this.configDir, name);
|
||||
if (!fs.exists(filePath)) {
|
||||
throw new YarnException("Can not find Configuration: " + name + " in "
|
||||
+ configDir);
|
||||
}
|
||||
} else {
|
||||
filePath = new Path(name);
|
||||
if (!fs.exists(filePath)) {
|
||||
throw new YarnException("Can not find file: " + name);
|
||||
}
|
||||
}
|
||||
return fs.open(filePath);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,11 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.yarn;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
@Private
|
||||
|
@ -30,9 +34,15 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
public class LocalConfigurationProvider extends ConfigurationProvider {
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration(Configuration bootstrapConf,
|
||||
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
String name) throws IOException, YarnException {
|
||||
return bootstrapConf;
|
||||
if (name == null || name.isEmpty()) {
|
||||
throw new YarnException(
|
||||
"Illegal argument! The parameter should not be null or empty");
|
||||
} else if (YarnConfiguration.RM_CONFIGURATION_FILES.contains(name)) {
|
||||
return bootstrapConf.getConfResourceAsInputStream(name);
|
||||
}
|
||||
return new FileInputStream(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.util.Set;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
|
@ -313,9 +312,7 @@ public class AdminService extends CompositeService implements
|
|||
RefreshQueuesResponse response =
|
||||
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
|
||||
try {
|
||||
Configuration conf = getConfiguration(getConfig(),
|
||||
YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||
rmContext.getScheduler().reinitialize(conf, this.rmContext);
|
||||
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
|
||||
"AdminService");
|
||||
return response;
|
||||
|
@ -331,23 +328,27 @@ public class AdminService extends CompositeService implements
|
|||
@Override
|
||||
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
throws YarnException, StandbyException {
|
||||
String argName = "refreshNodes";
|
||||
UserGroupInformation user = checkAcls("refreshNodes");
|
||||
|
||||
if (!isRMActive()) {
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes",
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"ResourceManager is not active. Can not refresh nodes.");
|
||||
throwStandbyException();
|
||||
}
|
||||
|
||||
try {
|
||||
rmContext.getNodesListManager().refreshNodes(new YarnConfiguration());
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes",
|
||||
Configuration conf =
|
||||
getConfiguration(new Configuration(false),
|
||||
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
|
||||
rmContext.getNodesListManager().refreshNodes(conf);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
|
||||
"AdminService");
|
||||
return recordFactory.newRecordInstance(RefreshNodesResponse.class);
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception refreshing nodes ", ioe);
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes",
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService", "Exception refreshing nodes");
|
||||
throw RPCUtil.getRemoteException(ioe);
|
||||
}
|
||||
|
@ -368,7 +369,7 @@ public class AdminService extends CompositeService implements
|
|||
}
|
||||
|
||||
Configuration conf =
|
||||
getConfiguration(getConfig(),
|
||||
getConfiguration(new Configuration(false),
|
||||
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
|
@ -393,7 +394,7 @@ public class AdminService extends CompositeService implements
|
|||
}
|
||||
|
||||
Groups.getUserToGroupsMappingService(
|
||||
getConfiguration(getConfig(),
|
||||
getConfiguration(new Configuration(false),
|
||||
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
|
||||
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService");
|
||||
|
@ -415,7 +416,8 @@ public class AdminService extends CompositeService implements
|
|||
throwStandbyException();
|
||||
}
|
||||
Configuration conf =
|
||||
getConfiguration(getConfig(), YarnConfiguration.YARN_SITE_XML_FILE);
|
||||
getConfiguration(new Configuration(false),
|
||||
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
|
||||
adminAcl = new AccessControlList(conf.get(
|
||||
YarnConfiguration.YARN_ADMIN_ACL,
|
||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||
|
@ -448,7 +450,7 @@ public class AdminService extends CompositeService implements
|
|||
|
||||
PolicyProvider policyProvider = RMPolicyProvider.getInstance();
|
||||
Configuration conf =
|
||||
getConfiguration(getConfig(),
|
||||
getConfiguration(new Configuration(false),
|
||||
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
|
||||
|
||||
refreshServiceAcls(conf, policyProvider);
|
||||
|
@ -463,13 +465,8 @@ public class AdminService extends CompositeService implements
|
|||
|
||||
private synchronized void refreshServiceAcls(Configuration configuration,
|
||||
PolicyProvider policyProvider) {
|
||||
if (this.rmContext.getConfigurationProvider() instanceof
|
||||
LocalConfigurationProvider) {
|
||||
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||
} else {
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -519,8 +516,9 @@ public class AdminService extends CompositeService implements
|
|||
|
||||
private synchronized Configuration getConfiguration(Configuration conf,
|
||||
String confFileName) throws YarnException, IOException {
|
||||
return this.rmContext.getConfigurationProvider().getConfiguration(conf,
|
||||
confFileName);
|
||||
conf.addResource(this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(conf, confFileName));
|
||||
return conf;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
|
|||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
|
@ -140,10 +139,10 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
if (conf.getBoolean(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
|
||||
false)) {
|
||||
refreshServiceAcls(
|
||||
this.rmContext.getConfigurationProvider().getConfiguration(conf,
|
||||
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
|
||||
RMPolicyProvider.getInstance());
|
||||
conf.addResource(this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(conf,
|
||||
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE));
|
||||
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
|
||||
}
|
||||
|
||||
this.server.start();
|
||||
|
@ -593,13 +592,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
|
||||
public void refreshServiceAcls(Configuration configuration,
|
||||
PolicyProvider policyProvider) {
|
||||
if (this.rmContext.getConfigurationProvider() instanceof
|
||||
LocalConfigurationProvider) {
|
||||
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||
} else {
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
|
@ -171,10 +170,10 @@ public class ClientRMService extends AbstractService implements
|
|||
if (conf.getBoolean(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
|
||||
false)) {
|
||||
refreshServiceAcls(
|
||||
this.rmContext.getConfigurationProvider().getConfiguration(conf,
|
||||
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
|
||||
RMPolicyProvider.getInstance());
|
||||
conf.addResource(this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(conf,
|
||||
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE));
|
||||
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
|
||||
}
|
||||
|
||||
this.server.start();
|
||||
|
@ -807,13 +806,8 @@ public class ClientRMService extends AbstractService implements
|
|||
|
||||
void refreshServiceAcls(Configuration configuration,
|
||||
PolicyProvider policyProvider) {
|
||||
if (this.rmContext.getConfigurationProvider() instanceof
|
||||
LocalConfigurationProvider) {
|
||||
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||
} else {
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
|
||||
private boolean isAllowedDelegationTokenOp() throws IOException {
|
||||
|
|
|
@ -32,12 +32,15 @@ import org.apache.hadoop.service.AbstractService;
|
|||
import org.apache.hadoop.util.HostsFileReader;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class NodesListManager extends AbstractService implements
|
||||
EventHandler<NodesListManagerEvent> {
|
||||
|
@ -51,6 +54,9 @@ public class NodesListManager extends AbstractService implements
|
|||
|
||||
private final RMContext rmContext;
|
||||
|
||||
private String includesFile;
|
||||
private String excludesFile;
|
||||
|
||||
public NodesListManager(RMContext rmContext) {
|
||||
super(NodesListManager.class.getName());
|
||||
this.rmContext = rmContext;
|
||||
|
@ -63,25 +69,17 @@ public class NodesListManager extends AbstractService implements
|
|||
|
||||
// Read the hosts/exclude files to restrict access to the RM
|
||||
try {
|
||||
this.hostsReader =
|
||||
new HostsFileReader(
|
||||
conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH),
|
||||
conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)
|
||||
);
|
||||
this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH);
|
||||
this.excludesFile = conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||
this.hostsReader =
|
||||
createHostsFileReader(this.includesFile, this.excludesFile);
|
||||
printConfiguredHosts();
|
||||
} catch (YarnException ex) {
|
||||
disableHostsFileReader(ex);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed to init hostsReader, disabling", ioe);
|
||||
try {
|
||||
this.hostsReader =
|
||||
new HostsFileReader(YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||
} catch (IOException ioe2) {
|
||||
// Should *never* happen
|
||||
this.hostsReader = null;
|
||||
throw new YarnRuntimeException(ioe2);
|
||||
}
|
||||
disableHostsFileReader(ioe);
|
||||
}
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
@ -103,17 +101,25 @@ public class NodesListManager extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
public void refreshNodes(Configuration yarnConf) throws IOException {
|
||||
public void refreshNodes(Configuration yarnConf) throws IOException,
|
||||
YarnException {
|
||||
synchronized (hostsReader) {
|
||||
if (null == yarnConf) {
|
||||
yarnConf = new YarnConfiguration();
|
||||
}
|
||||
hostsReader.updateFileNames(yarnConf.get(
|
||||
YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH), yarnConf.get(
|
||||
YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
|
||||
hostsReader.refresh();
|
||||
includesFile =
|
||||
yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH);
|
||||
excludesFile =
|
||||
yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||
hostsReader.updateFileNames(includesFile, excludesFile);
|
||||
hostsReader.refresh(
|
||||
includesFile.isEmpty() ? null : this.rmContext
|
||||
.getConfigurationProvider().getConfigurationInputStream(
|
||||
this.conf, includesFile), excludesFile.isEmpty() ? null
|
||||
: this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(this.conf, excludesFile));
|
||||
printConfiguredHosts();
|
||||
}
|
||||
}
|
||||
|
@ -174,4 +180,43 @@ public class NodesListManager extends AbstractService implements
|
|||
LOG.error("Ignoring invalid eventtype " + event.getType());
|
||||
}
|
||||
}
|
||||
|
||||
private void disableHostsFileReader(Exception ex) {
|
||||
LOG.warn("Failed to init hostsReader, disabling", ex);
|
||||
try {
|
||||
this.includesFile =
|
||||
conf.get(YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH);
|
||||
this.excludesFile =
|
||||
conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||
this.hostsReader =
|
||||
createHostsFileReader(this.includesFile, this.excludesFile);
|
||||
} catch (IOException ioe2) {
|
||||
// Should *never* happen
|
||||
this.hostsReader = null;
|
||||
throw new YarnRuntimeException(ioe2);
|
||||
} catch (YarnException e) {
|
||||
// Should *never* happen
|
||||
this.hostsReader = null;
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public HostsFileReader getHostsReader() {
|
||||
return this.hostsReader;
|
||||
}
|
||||
|
||||
private HostsFileReader createHostsFileReader(String includesFile,
|
||||
String excludesFile) throws IOException, YarnException {
|
||||
HostsFileReader hostsReader =
|
||||
new HostsFileReader(includesFile,
|
||||
(includesFile == null || includesFile.isEmpty()) ? null
|
||||
: this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(this.conf, includesFile),
|
||||
excludesFile,
|
||||
(excludesFile == null || excludesFile.isEmpty()) ? null
|
||||
: this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(this.conf, excludesFile));
|
||||
return hostsReader;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.http.HttpConfig.Policy;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||
import org.apache.hadoop.security.Groups;
|
||||
|
@ -44,7 +42,6 @@ import org.apache.hadoop.util.ExitUtil;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -191,19 +188,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
||||
this.configurationProvider.init(this.conf);
|
||||
rmContext.setConfigurationProvider(configurationProvider);
|
||||
if (!(this.configurationProvider instanceof LocalConfigurationProvider)) {
|
||||
// load yarn-site.xml
|
||||
this.conf =
|
||||
this.configurationProvider.getConfiguration(this.conf,
|
||||
YarnConfiguration.YARN_SITE_XML_FILE);
|
||||
// load core-site.xml
|
||||
this.conf =
|
||||
this.configurationProvider.getConfiguration(this.conf,
|
||||
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
|
||||
// Do refreshUserToGroupsMappings with loaded core-site.xml
|
||||
Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(this.conf)
|
||||
.refresh();
|
||||
}
|
||||
|
||||
// load yarn-site.xml
|
||||
this.conf.addResource(this.configurationProvider
|
||||
.getConfigurationInputStream(this.conf,
|
||||
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE));
|
||||
// load core-site.xml
|
||||
this.conf.addResource(this.configurationProvider
|
||||
.getConfigurationInputStream(this.conf,
|
||||
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE));
|
||||
// Do refreshUserToGroupsMappings with loaded core-site.xml
|
||||
Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(this.conf)
|
||||
.refresh();
|
||||
|
||||
// register the handlers for all AlwaysOn services using setupDispatcher().
|
||||
rmDispatcher = setupDispatcher();
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.hadoop.net.Node;
|
|||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.VersionUtil;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -164,10 +163,10 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
if (conf.getBoolean(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
|
||||
false)) {
|
||||
refreshServiceAcls(
|
||||
this.rmContext.getConfigurationProvider().getConfiguration(conf,
|
||||
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
|
||||
RMPolicyProvider.getInstance());
|
||||
conf.addResource(this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(conf,
|
||||
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE));
|
||||
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
|
||||
}
|
||||
|
||||
this.server.start();
|
||||
|
@ -421,13 +420,8 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
|
||||
void refreshServiceAcls(Configuration configuration,
|
||||
PolicyProvider policyProvider) {
|
||||
if (this.rmContext.getConfigurationProvider() instanceof
|
||||
LocalConfigurationProvider) {
|
||||
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||
} else {
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
|
||||
policyProvider);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configurable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
|
@ -263,19 +262,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
|
||||
Configuration configuration = new Configuration(conf);
|
||||
if (!initialized) {
|
||||
if (rmContext.getConfigurationProvider() instanceof
|
||||
LocalConfigurationProvider) {
|
||||
this.conf = new CapacitySchedulerConfiguration(configuration, true);
|
||||
} else {
|
||||
try {
|
||||
this.conf =
|
||||
new CapacitySchedulerConfiguration(rmContext
|
||||
.getConfigurationProvider().getConfiguration(configuration,
|
||||
YarnConfiguration.CS_CONFIGURATION_FILE), false);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
this.rmContext = rmContext;
|
||||
this.conf = loadCapacitySchedulerConfiguration(configuration);
|
||||
validateConf(this.conf);
|
||||
this.minimumAllocation = this.conf.getMinimumAllocation();
|
||||
this.maximumAllocation = this.conf.getMaximumAllocation();
|
||||
|
@ -283,7 +271,6 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
this.usePortForNodeName = this.conf.getUsePortForNodeName();
|
||||
this.applications =
|
||||
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
|
||||
this.rmContext = rmContext;
|
||||
|
||||
initializeQueues(this.conf);
|
||||
|
||||
|
@ -294,10 +281,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
"maximumAllocation=<" + getMaximumResourceCapability() + ">");
|
||||
} else {
|
||||
CapacitySchedulerConfiguration oldConf = this.conf;
|
||||
this.conf =
|
||||
new CapacitySchedulerConfiguration(conf,
|
||||
rmContext.getConfigurationProvider() instanceof
|
||||
LocalConfigurationProvider);
|
||||
this.conf = loadCapacitySchedulerConfiguration(configuration);
|
||||
validateConf(this.conf);
|
||||
try {
|
||||
LOG.info("Re-initializing queues...");
|
||||
|
@ -1042,4 +1026,16 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
queue.collectSchedulerApplications(apps);
|
||||
return apps;
|
||||
}
|
||||
|
||||
private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration(
|
||||
Configuration configuration) throws IOException {
|
||||
try {
|
||||
configuration.addResource(this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(configuration,
|
||||
YarnConfiguration.CS_CONFIGURATION_FILE));
|
||||
return new CapacitySchedulerConfiguration(configuration, false);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,10 @@ import java.io.DataOutputStream;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
|
@ -41,6 +43,7 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
|
@ -452,6 +455,69 @@ public class TestRMAdminService {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshNodesWithLocalConfigurationProvider() {
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
|
||||
try {
|
||||
rm.adminService.refreshNodes(RefreshNodesRequest.newInstance());
|
||||
} catch (Exception ex) {
|
||||
fail("Using localConfigurationProvider. Should not get any exception.");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshNodesWithFileSystemBasedConfigurationProvider()
|
||||
throws IOException, YarnException {
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
|
||||
try {
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
fail("Should throw an exception");
|
||||
} catch (Exception ex) {
|
||||
// Expect exception here
|
||||
}
|
||||
|
||||
// upload default configurations
|
||||
uploadDefaultConfiguration();
|
||||
|
||||
try {
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
} catch (Exception ex) {
|
||||
fail("Should not get any exceptions");
|
||||
}
|
||||
|
||||
final File excludeHostsFile = new File(tmpDir.toString(), "excludeHosts");
|
||||
if (excludeHostsFile.exists()) {
|
||||
excludeHostsFile.delete();
|
||||
}
|
||||
if (!excludeHostsFile.createNewFile()) {
|
||||
Assert.fail("Can not create " + "excludeHosts");
|
||||
}
|
||||
PrintWriter fileWriter = new PrintWriter(excludeHostsFile);
|
||||
fileWriter.write("0.0.0.0:123");
|
||||
fileWriter.close();
|
||||
|
||||
uploadToRemoteFileSystem(new Path(excludeHostsFile.getAbsolutePath()));
|
||||
|
||||
Configuration yarnConf = new YarnConfiguration();
|
||||
yarnConf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, this.workingPath
|
||||
+ "/excludeHosts");
|
||||
uploadConfiguration(yarnConf, YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
|
||||
|
||||
rm.adminService.refreshNodes(RefreshNodesRequest.newInstance());
|
||||
Set<String> excludeHosts =
|
||||
rm.getNodesListManager().getHostsReader().getExcludedHosts();
|
||||
Assert.assertTrue(excludeHosts.size() == 1);
|
||||
Assert.assertTrue(excludeHosts.contains("0.0.0.0:123"));
|
||||
}
|
||||
|
||||
private String writeConfigurationXML(Configuration conf, String confXMLName)
|
||||
throws IOException {
|
||||
DataOutputStream output = null;
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
</configuration>
|
|
@ -0,0 +1,18 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
</configuration>
|
Loading…
Reference in New Issue