YARN-10623. Capacity scheduler should support refresh queue automatically by a thread policy. Contributed by Qi Zhu.

This commit is contained in:
Peter Bacsko 2021-03-04 12:23:11 +01:00
parent d615e2d3bd
commit a85aeee876
5 changed files with 528 additions and 0 deletions

View File

@ -92,4 +92,12 @@ public class FileSystemBasedConfigurationProvider
public synchronized void closeInternal() throws Exception { public synchronized void closeInternal() throws Exception {
fs.close(); fs.close();
} }
public FileSystem getFs() {
return fs;
}
public Path getConfigDir() {
return configDir;
}
} }

View File

@ -3396,6 +3396,10 @@ public class CapacityScheduler extends
return null; return null;
} }
public CSConfigurationProvider getCsConfProvider() {
return csConfProvider;
}
@Override @Override
public void resetSchedulerMetrics() { public void resetSchedulerMetrics() {
CapacitySchedulerMetrics.destroy(); CapacitySchedulerMetrics.destroy();

View File

@ -2200,6 +2200,18 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL =
1500L; 1500L;
/**
* Time in milliseconds between invocations
* of QueueConfigurationAutoRefreshPolicy.
*/
@Private
public static final String QUEUE_AUTO_REFRESH_MONITORING_INTERVAL =
PREFIX + "queue.auto.refresh.monitoring-interval";
@Private
public static final long DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL =
5000L;
/** /**
* Queue Management computation policy for Auto Created queues * Queue Management computation policy for Auto Created queues
* @param queue The queue's path * @param queue The queue's path

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.io.IOException;
/**
* Queue auto refresh policy for queues.
*/
public class QueueConfigurationAutoRefreshPolicy
implements SchedulingEditPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(QueueConfigurationAutoRefreshPolicy.class);
private Clock clock;
// Pointer to other RM components
private RMContext rmContext;
private ResourceCalculator rc;
private CapacityScheduler scheduler;
private RMNodeLabelsManager nlm;
private long monitoringInterval;
private long lastModified;
// Last time we attempt to reload queues
// included successful and failed case.
private long lastReloadAttempt;
private boolean lastReloadAttemptFailed = false;
// Path to XML file containing allocations.
private Path allocCsFile;
private FileSystem fs;
/**
* Instantiated by CapacitySchedulerConfiguration.
*/
public QueueConfigurationAutoRefreshPolicy() {
clock = new MonotonicClock();
}
@Override
public void init(final Configuration config, final RMContext context,
final ResourceScheduler sched) {
LOG.info("Queue auto refresh Policy monitor: {}" + this.
getClass().getCanonicalName());
assert null == scheduler : "Unexpected duplicate call to init";
if (!(sched instanceof CapacityScheduler)) {
throw new YarnRuntimeException("Class " +
sched.getClass().getCanonicalName() + " not instance of " +
CapacityScheduler.class.getCanonicalName());
}
rmContext = context;
scheduler = (CapacityScheduler) sched;
clock = scheduler.getClock();
rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager();
CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration();
monitoringInterval = csConfig.getLong(
CapacitySchedulerConfiguration.QUEUE_AUTO_REFRESH_MONITORING_INTERVAL,
CapacitySchedulerConfiguration.
DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL);
}
@Override
public void editSchedule() {
long startTs = clock.getTime();
try {
// Support both FileSystemBased and LocalFile based
if (rmContext.getYarnConfiguration().
get(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS).
equals(FileSystemBasedConfigurationProvider
.class.getCanonicalName())) {
allocCsFile = new Path(rmContext.getYarnConfiguration().
get(YarnConfiguration.FS_BASED_RM_CONF_STORE),
YarnConfiguration.CS_CONFIGURATION_FILE);
} else {
allocCsFile = new Path(rmContext.getYarnConfiguration()
.getClassLoader().getResource("").toString(),
YarnConfiguration.CS_CONFIGURATION_FILE);
}
// Check if the cs related conf modified
fs = allocCsFile.getFileSystem(rmContext.getYarnConfiguration());
lastModified =
fs.getFileStatus(allocCsFile).getModificationTime();
long time = clock.getTime();
if (lastModified > lastReloadAttempt &&
time > lastReloadAttempt + monitoringInterval) {
try {
rmContext.getRMAdminService().refreshQueues();
LOG.info("Queue auto refresh completed successfully");
lastReloadAttempt = clock.getTime();
} catch (IOException | YarnException e) {
LOG.error("Can't refresh queue: " + e);
if (!lastReloadAttemptFailed) {
LOG.error("Failed to reload capacity scheduler config file - " +
"will use existing conf.", e.getMessage());
}
lastReloadAttempt = clock.getTime();
lastReloadAttemptFailed = true;
}
} else if (lastModified == 0L) {
if (!lastReloadAttemptFailed) {
LOG.warn("Failed to reload capacity scheduler config file because" +
" last modified returned 0. File exists: "
+ fs.exists(allocCsFile));
}
lastReloadAttemptFailed = true;
}
} catch (IOException e) {
LOG.error("Can't get file status for refresh : " + e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
}
}
@VisibleForTesting
long getLastReloadAttempt() {
return lastReloadAttempt;
}
@VisibleForTesting
long getLastModified() {
return lastModified;
}
@VisibleForTesting
Clock getClock() {
return clock;
}
@VisibleForTesting
boolean getLastReloadAttemptFailed() {
return lastReloadAttemptFailed;
}
@Override
public long getMonitoringInterval() {
return monitoringInterval;
}
@Override
public String getPolicyName() {
return QueueConfigurationAutoRefreshPolicy.class.getCanonicalName();
}
}

View File

@ -0,0 +1,308 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
public class TestQueueConfigurationAutoRefreshPolicy {
private Configuration configuration;
private MockRM rm = null;
private FileSystem fs;
private Path workingPath;
private Path workingPathRecover;
private Path fileSystemWorkingPath;
private Path tmpDir;
private QueueConfigurationAutoRefreshPolicy policy;
static {
YarnConfiguration.addDefaultResource(
YarnConfiguration.CS_CONFIGURATION_FILE);
YarnConfiguration.addDefaultResource(
YarnConfiguration.DR_CONFIGURATION_FILE);
}
@Before
public void setup() throws IOException {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
configuration = new YarnConfiguration();
configuration.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName());
fs = FileSystem.get(configuration);
workingPath = new Path(QueueConfigurationAutoRefreshPolicy.
class.getClassLoader().
getResource(".").toString());
workingPathRecover = new Path(QueueConfigurationAutoRefreshPolicy.
class.getClassLoader().
getResource(".").toString() + "/" + "Recover");
fileSystemWorkingPath =
new Path(new File("target", this.getClass().getSimpleName()
+ "-remoteDir").getAbsolutePath());
tmpDir = new Path(new File("target", this.getClass().getSimpleName()
+ "-tmpDir").getAbsolutePath());
fs.delete(fileSystemWorkingPath, true);
fs.mkdirs(fileSystemWorkingPath);
fs.delete(tmpDir, true);
fs.mkdirs(tmpDir);
policy =
new QueueConfigurationAutoRefreshPolicy();
}
private String writeConfigurationXML(Configuration conf, String confXMLName)
throws IOException {
DataOutputStream output = null;
try {
final File confFile = new File(tmpDir.toString(), confXMLName);
if (confFile.exists()) {
confFile.delete();
}
if (!confFile.createNewFile()) {
Assert.fail("Can not create " + confXMLName);
}
output = new DataOutputStream(
new FileOutputStream(confFile));
conf.writeXml(output);
return confFile.getAbsolutePath();
} finally {
if (output != null) {
output.close();
}
}
}
private void uploadConfiguration(Boolean isFileSystemBased,
Configuration conf, String confFileName)
throws IOException {
String csConfFile = writeConfigurationXML(conf, confFileName);
if (isFileSystemBased) {
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(csConfFile),
fileSystemWorkingPath);
} else {
// upload the file into Work Path for Local File
uploadToRemoteFileSystem(new Path(csConfFile),
workingPath);
}
}
private void uploadToRemoteFileSystem(Path filePath, Path remotePath)
throws IOException {
fs.copyFromLocalFile(filePath, remotePath);
}
private void uploadDefaultConfiguration(Boolean
isFileSystemBased) throws IOException {
Configuration conf = new Configuration();
uploadConfiguration(isFileSystemBased,
conf, "core-site.xml");
YarnConfiguration yarnConf = new YarnConfiguration();
uploadConfiguration(isFileSystemBased,
yarnConf, "yarn-site.xml");
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
uploadConfiguration(isFileSystemBased,
csConf, "capacity-scheduler.xml");
Configuration hadoopPolicyConf = new Configuration(false);
hadoopPolicyConf
.addResource(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
uploadConfiguration(isFileSystemBased,
hadoopPolicyConf, "hadoop-policy.xml");
}
@Test
public void testFileSystemBasedEditSchedule() throws Exception {
// Test FileSystemBasedConfigurationProvider scheduled
testCommon(true);
}
@Test
public void testLocalFileBasedEditSchedule() throws Exception {
// Prepare for recover for local file default.
fs.mkdirs(workingPath);
fs.copyFromLocalFile(new Path(workingPath.toString()
+ "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE),
new Path(workingPathRecover.toString()
+ "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE));
fs.copyFromLocalFile(new Path(workingPath.toString()
+ "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE),
new Path(workingPathRecover.toString()
+ "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE));
fs.copyFromLocalFile(new Path(workingPath.toString()
+ "/" + YarnConfiguration.CS_CONFIGURATION_FILE),
new Path(workingPathRecover.toString()
+ "/" + YarnConfiguration.CS_CONFIGURATION_FILE));
// Test LocalConfigurationProvider scheduled
testCommon(false);
// Recover for recover for local file default.
fs.copyFromLocalFile(new Path(workingPathRecover.toString()
+ "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE),
new Path(workingPath.toString()
+ "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE));
fs.copyFromLocalFile(new Path(workingPathRecover.toString()
+ "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE),
new Path(workingPath.toString()
+ "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE));
fs.copyFromLocalFile(new Path(workingPathRecover.toString()
+ "/" + YarnConfiguration.CS_CONFIGURATION_FILE),
new Path(workingPath.toString()
+ "/" + YarnConfiguration.CS_CONFIGURATION_FILE));
fs.delete(workingPathRecover, true);
}
public void testCommon(Boolean isFileSystemBased) throws Exception {
// Set auto refresh interval to 1s
configuration.setLong(CapacitySchedulerConfiguration.
QUEUE_AUTO_REFRESH_MONITORING_INTERVAL,
1000L);
if (isFileSystemBased) {
configuration.set(YarnConfiguration.FS_BASED_RM_CONF_STORE,
fileSystemWorkingPath.toString());
}
//upload default configurations
uploadDefaultConfiguration(isFileSystemBased);
if (isFileSystemBased) {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
FileSystemBasedConfigurationProvider.class.getCanonicalName());
} else {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class.getCanonicalName());
}
// upload the auto refresh related configurations
uploadConfiguration(isFileSystemBased,
configuration, "yarn-site.xml");
uploadConfiguration(isFileSystemBased,
configuration, "capacity-scheduler.xml");
rm = new MockRM(configuration);
rm.init(configuration);
policy.init(configuration,
rm.getRMContext(),
rm.getResourceScheduler());
rm.start();
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setInt(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS,
5000);
uploadConfiguration(isFileSystemBased,
csConf, "capacity-scheduler.xml");
// Refreshed first time.
policy.editSchedule();
// Make sure refresh successfully.
Assert.assertFalse(policy.getLastReloadAttemptFailed());
long oldModified = policy.getLastModified();
long oldSuccess = policy.getLastReloadAttempt();
Assert.assertTrue(oldSuccess > oldModified);
int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxAppsAfter, 5000);
Assert.assertTrue(maxAppsAfter != maxAppsBefore);
// Trigger interval for refresh.
GenericTestUtils.waitFor(() -> (policy.getClock().getTime() -
policy.getLastReloadAttempt()) / 1000 > 1,
500, 3000);
// Upload for modified.
csConf.setInt(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS,
3000);
uploadConfiguration(isFileSystemBased,
csConf, "capacity-scheduler.xml");
policy.editSchedule();
// Wait for triggered refresh.
GenericTestUtils.waitFor(() -> policy.getLastReloadAttempt() >
policy.getLastModified(),
500, 3000);
// Make sure refresh successfully.
Assert.assertFalse(policy.getLastReloadAttemptFailed());
oldModified = policy.getLastModified();
oldSuccess = policy.getLastReloadAttempt();
Assert.assertTrue(oldSuccess > oldModified);
Assert.assertEquals(cs.getConfiguration().
getMaximumSystemApplications(), 3000);
// Trigger interval for refresh.
GenericTestUtils.waitFor(() -> (policy.getClock().getTime() -
policy.getLastReloadAttempt()) / 1000 > 1,
500, 3000);
// Without modified
policy.editSchedule();
Assert.assertEquals(oldModified,
policy.getLastModified());
Assert.assertEquals(oldSuccess,
policy.getLastReloadAttempt());
}
@After
public void tearDown() throws IOException {
if (rm != null) {
rm.stop();
}
fs.delete(fileSystemWorkingPath, true);
fs.delete(tmpDir, true);
}
}