YARN-1611. Introduced the concept of a configuration provider which can be used by ResourceManager to read configuration locally or from remote systems so as to help RM failover. Contributed by Xuan Gong.
svn merge -c 1564002 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1564003 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ef4cbe7c15
commit
d083be9bc5
|
@ -59,6 +59,10 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1633. Defined user-facing entity, entity-info and event objects related
|
||||
to Application Timeline feature. (Zhijie Shen via vinodkv)
|
||||
|
||||
YARN-1611. Introduced the concept of a configuration provider which can be
|
||||
used by ResourceManager to read configuration locally or from remote systems
|
||||
so as to help RM failover. (Xuan Gong via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.conf;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
/**
|
||||
* Base class to implement ConfigurationProvider.
|
||||
* Real ConfigurationProvider implementations need to derive from it and
|
||||
* implement load methods to actually load the configuration.
|
||||
*/
|
||||
public abstract class ConfigurationProvider {
|
||||
|
||||
public void init(Configuration conf) throws Exception {
|
||||
initInternal(conf);
|
||||
}
|
||||
|
||||
public void close() throws Exception {
|
||||
closeInternal();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the configuration.
|
||||
* @param name The configuration file name
|
||||
* @return configuration
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Configuration getConfiguration(String name)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* Derived classes initialize themselves using this method.
|
||||
*/
|
||||
public abstract void initInternal(Configuration conf) throws Exception;
|
||||
|
||||
/**
|
||||
* Derived classes close themselves using this method.
|
||||
*/
|
||||
public abstract void closeInternal() throws Exception;
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.conf;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
/**
|
||||
* Factory for {@link ConfigurationProvider} implementations.
|
||||
*/
|
||||
public class ConfigurationProviderFactory {
|
||||
/**
|
||||
* Creates an instance of {@link ConfigurationProvider} using given
|
||||
* configuration.
|
||||
* @param conf
|
||||
* @return configurationProvider
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static ConfigurationProvider
|
||||
getConfigurationProvider(Configuration conf) {
|
||||
Class<? extends ConfigurationProvider> defaultProviderClass;
|
||||
try {
|
||||
defaultProviderClass = (Class<? extends ConfigurationProvider>)
|
||||
Class.forName(
|
||||
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS);
|
||||
} catch (Exception e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Invalid default configuration provider class"
|
||||
+ YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e);
|
||||
}
|
||||
ConfigurationProvider configurationProvider = ReflectionUtils.newInstance(
|
||||
conf.getClass(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
defaultProviderClass, ConfigurationProvider.class), conf);
|
||||
return configurationProvider;
|
||||
}
|
||||
}
|
|
@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|||
@Evolving
|
||||
public class YarnConfiguration extends Configuration {
|
||||
|
||||
@Private
|
||||
public static final String CS_CONFIGURATION_FILE= "capacity-scheduler.xml";
|
||||
|
||||
private static final String YARN_DEFAULT_XML_FILE = "yarn-default.xml";
|
||||
private static final String YARN_SITE_XML_FILE = "yarn-site.xml";
|
||||
|
||||
|
@ -329,6 +332,16 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
|
||||
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
|
||||
|
||||
/** Store the related configuration files in File System */
|
||||
public static final String FS_BASED_RM_CONF_STORE = RM_PREFIX
|
||||
+ "configuration.file-system-based-store";
|
||||
public static final String DEFAULT_FS_BASED_RM_CONF_STORE = "/yarn/conf";
|
||||
|
||||
public static final String RM_CONFIGURATION_PROVIDER_CLASS = RM_PREFIX
|
||||
+ "configuration.provider-class";
|
||||
public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS =
|
||||
"org.apache.hadoop.yarn.LocalConfigurationProvider";
|
||||
|
||||
@Private
|
||||
public static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class FileSystemBasedConfigurationProvider
|
||||
extends ConfigurationProvider {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(FileSystemBasedConfigurationProvider.class);
|
||||
private FileSystem fs;
|
||||
private Path configDir;
|
||||
|
||||
@Override
|
||||
public synchronized Configuration getConfiguration(String name)
|
||||
throws IOException, YarnException {
|
||||
Path configPath = new Path(this.configDir, name);
|
||||
if (!fs.exists(configPath)) {
|
||||
throw new YarnException("Can not find Configuration: " + name + " in "
|
||||
+ configDir);
|
||||
}
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.addResource(fs.open(configPath));
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void initInternal(Configuration conf) throws Exception {
|
||||
configDir =
|
||||
new Path(conf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
|
||||
YarnConfiguration.DEFAULT_FS_BASED_RM_CONF_STORE));
|
||||
fs = configDir.getFileSystem(conf);
|
||||
if (!fs.exists(configDir)) {
|
||||
fs.mkdirs(configDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void closeInternal() throws Exception {
|
||||
fs.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class LocalConfigurationProvider extends ConfigurationProvider {
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration(String name)
|
||||
throws IOException, YarnException {
|
||||
return new Configuration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initInternal(Configuration conf) throws Exception {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeInternal() throws Exception {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
|
@ -588,6 +588,18 @@
|
|||
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The class to use as the configuration provider.
|
||||
If org.apache.hadoop.yarn.LocalConfigurationProvider is used,
|
||||
the local configuration will be loaded.
|
||||
If org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider is used,
|
||||
the configuration which will be loaded should be uploaded to remote File system first.
|
||||
</description>>
|
||||
<name>yarn.resourcemanager.configuration.provider-class</name>
|
||||
<value>org.apache.hadoop.yarn.LocalConfigurationProvider</value>
|
||||
<!-- <value>org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider</value> -->
|
||||
</property>
|
||||
|
||||
<!-- Node Manager Configs -->
|
||||
<property>
|
||||
<description>The hostname of the NM.</description>
|
||||
|
|
|
@ -47,6 +47,8 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
|
|||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -89,6 +91,8 @@ public class AdminService extends CompositeService implements
|
|||
private InetSocketAddress masterServiceAddress;
|
||||
private AccessControlList adminAcl;
|
||||
|
||||
private ConfigurationProvider configurationProvider = null;
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
|
@ -109,6 +113,10 @@ public class AdminService extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
this.configurationProvider =
|
||||
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
||||
configurationProvider.init(conf);
|
||||
|
||||
masterServiceAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||
|
@ -129,6 +137,9 @@ public class AdminService extends CompositeService implements
|
|||
@Override
|
||||
protected synchronized void serviceStop() throws Exception {
|
||||
stopServer();
|
||||
if (this.configurationProvider != null) {
|
||||
configurationProvider.close();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
@ -295,23 +306,28 @@ public class AdminService extends CompositeService implements
|
|||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws YarnException, StandbyException {
|
||||
UserGroupInformation user = checkAcls("refreshQueues");
|
||||
String argName = "refreshQueues";
|
||||
UserGroupInformation user = checkAcls(argName);
|
||||
|
||||
if (!isRMActive()) {
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"ResourceManager is not active. Can not refresh queues.");
|
||||
throwStandbyException();
|
||||
}
|
||||
|
||||
RefreshQueuesResponse response =
|
||||
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
|
||||
try {
|
||||
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues",
|
||||
Configuration conf =
|
||||
getConfiguration(YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||
rmContext.getScheduler().reinitialize(conf, this.rmContext);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
|
||||
"AdminService");
|
||||
return recordFactory.newRecordInstance(RefreshQueuesResponse.class);
|
||||
return response;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception refreshing queues ", ioe);
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"Exception refreshing queues");
|
||||
throw RPCUtil.getRemoteException(ioe);
|
||||
|
@ -484,4 +500,8 @@ public class AdminService extends CompositeService implements
|
|||
return response;
|
||||
}
|
||||
|
||||
private synchronized Configuration getConfiguration(String confFileName)
|
||||
throws YarnException, IOException {
|
||||
return this.configurationProvider.getConfiguration(confFileName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -195,6 +195,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
|
||||
private ResourceCalculator calculator;
|
||||
private boolean usePortForNodeName;
|
||||
private boolean useLocalConfigurationProvider;
|
||||
|
||||
public CapacityScheduler() {}
|
||||
|
||||
|
@ -261,7 +262,13 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
public synchronized void
|
||||
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
|
||||
if (!initialized) {
|
||||
this.conf = new CapacitySchedulerConfiguration(conf);
|
||||
this.useLocalConfigurationProvider = conf.get(
|
||||
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS).equals(
|
||||
"org.apache.hadoop.yarn.LocalConfigurationProvider");
|
||||
this.conf =
|
||||
new CapacitySchedulerConfiguration(conf,
|
||||
this.useLocalConfigurationProvider);
|
||||
validateConf(this.conf);
|
||||
this.minimumAllocation = this.conf.getMinimumAllocation();
|
||||
this.maximumAllocation = this.conf.getMaximumAllocation();
|
||||
|
@ -279,9 +286,10 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
|
||||
"maximumAllocation=<" + getMaximumResourceCapability() + ">");
|
||||
} else {
|
||||
|
||||
CapacitySchedulerConfiguration oldConf = this.conf;
|
||||
this.conf = new CapacitySchedulerConfiguration(conf);
|
||||
this.conf =
|
||||
new CapacitySchedulerConfiguration(conf,
|
||||
this.useLocalConfigurationProvider);
|
||||
validateConf(this.conf);
|
||||
try {
|
||||
LOG.info("Re-initializing queues...");
|
||||
|
|
|
@ -140,9 +140,16 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
|||
}
|
||||
|
||||
public CapacitySchedulerConfiguration(Configuration configuration) {
|
||||
this(configuration, true);
|
||||
}
|
||||
|
||||
public CapacitySchedulerConfiguration(Configuration configuration,
|
||||
boolean useLocalConfigurationProvider) {
|
||||
super(configuration);
|
||||
if (useLocalConfigurationProvider) {
|
||||
addResource(CS_CONFIGURATION_FILE);
|
||||
}
|
||||
}
|
||||
|
||||
private String getQueuePrefix(String queue) {
|
||||
String queueName = PREFIX + queue + DOT;
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class TestRMAdminService {
|
||||
|
||||
private final Configuration configuration = new YarnConfiguration();
|
||||
private MockRM rm = null;
|
||||
private FileSystem fs;
|
||||
private Path workingPath;
|
||||
private Path tmpDir;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
fs = FileSystem.get(configuration);
|
||||
workingPath =
|
||||
new Path(new File("target", this.getClass().getSimpleName()
|
||||
+ "-remoteDir").getAbsolutePath());
|
||||
configuration.set(YarnConfiguration.FS_BASED_RM_CONF_STORE,
|
||||
workingPath.toString());
|
||||
tmpDir = new Path(new File("target", this.getClass().getSimpleName()
|
||||
+ "-tmpDir").getAbsolutePath());
|
||||
fs.delete(workingPath, true);
|
||||
fs.delete(tmpDir, true);
|
||||
fs.mkdirs(workingPath);
|
||||
fs.mkdirs(tmpDir);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
}
|
||||
fs.delete(workingPath, true);
|
||||
fs.delete(tmpDir, true);
|
||||
}
|
||||
@Test
|
||||
public void testAdminRefreshQueuesWithLocalConfigurationProvider()
|
||||
throws IOException, YarnException {
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs =
|
||||
(CapacityScheduler) rm.getRMContext().getScheduler();
|
||||
int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();
|
||||
|
||||
try {
|
||||
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
|
||||
Assert.assertEquals(maxAppsBefore, cs.getConfiguration()
|
||||
.getMaximumSystemApplications());
|
||||
} catch (Exception ex) {
|
||||
fail("Using localConfigurationProvider. Should not get any exception.");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider()
|
||||
throws IOException, YarnException {
|
||||
Configuration.addDefaultResource(YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
|
||||
// clean the remoteDirectory
|
||||
cleanRemoteDirectory();
|
||||
|
||||
CapacityScheduler cs =
|
||||
(CapacityScheduler) rm.getRMContext().getScheduler();
|
||||
int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();
|
||||
|
||||
try {
|
||||
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
|
||||
fail("FileSystemBasedConfigurationProvider is used." +
|
||||
" Should get an exception here");
|
||||
} catch (Exception ex) {
|
||||
Assert.assertTrue(ex.getMessage().contains(
|
||||
"Can not find Configuration: capacity-scheduler.xml"));
|
||||
}
|
||||
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csConf.set("yarn.scheduler.capacity.maximum-applications", "5000");
|
||||
String csConfFile = writeConfigurationXML(csConf,
|
||||
"capacity-scheduler.xml");
|
||||
|
||||
// upload the file into Remote File System
|
||||
uploadToRemoteFileSystem(new Path(csConfFile));
|
||||
|
||||
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
|
||||
|
||||
int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications();
|
||||
Assert.assertEquals(maxAppsAfter, 5000);
|
||||
Assert.assertTrue(maxAppsAfter != maxAppsBefore);
|
||||
}
|
||||
|
||||
private String writeConfigurationXML(Configuration conf, String confXMLName)
|
||||
throws IOException {
|
||||
DataOutputStream output = null;
|
||||
try {
|
||||
final File confFile = new File(tmpDir.toString(), confXMLName);
|
||||
if (confFile.exists()) {
|
||||
confFile.delete();
|
||||
}
|
||||
if (!confFile.createNewFile()) {
|
||||
Assert.fail("Can not create " + confXMLName);
|
||||
}
|
||||
output = new DataOutputStream(
|
||||
new FileOutputStream(confFile));
|
||||
conf.writeXml(output);
|
||||
return confFile.getAbsolutePath();
|
||||
} finally {
|
||||
if (output != null) {
|
||||
output.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadToRemoteFileSystem(Path filePath)
|
||||
throws IOException {
|
||||
fs.copyFromLocalFile(filePath, workingPath);
|
||||
}
|
||||
|
||||
private void cleanRemoteDirectory() throws IOException {
|
||||
if (fs.exists(workingPath)) {
|
||||
for (FileStatus file : fs.listStatus(workingPath)) {
|
||||
fs.delete(file.getPath(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue