diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java index 3532d13f4a9..156468e4f48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java @@ -92,4 +92,12 @@ public synchronized void initInternal(Configuration bootstrapConf) public synchronized void closeInternal() throws Exception { fs.close(); } + + public FileSystem getFs() { + return fs; + } + + public Path getConfigDir() { + return configDir; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 36f831dca94..ee91b0c3825 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -3396,6 +3396,10 @@ public MutableConfigurationProvider getMutableConfProvider() { return null; } + public CSConfigurationProvider getCsConfProvider() { + return csConfProvider; + } + @Override public void resetSchedulerMetrics() { CapacitySchedulerMetrics.destroy(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 8e605964e48..b66ab85733e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2200,6 +2200,18 @@ public void setAutoCreatedQueuesV2MaxChildQueuesLimit(String queuePath, public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + /** + * Time in milliseconds between invocations + * of QueueConfigurationAutoRefreshPolicy. + */ + @Private + public static final String QUEUE_AUTO_REFRESH_MONITORING_INTERVAL = + PREFIX + "queue.auto.refresh.monitoring-interval"; + + @Private + public static final long DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL = + 5000L; + /** * Queue Management computation policy for Auto Created queues * @param queue The queue's path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java new file mode 100644 index 00000000000..0ae0777e801 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.io.IOException; + + +/** + * Queue auto refresh policy for queues. + */ +public class QueueConfigurationAutoRefreshPolicy + implements SchedulingEditPolicy { + + private static final Logger LOG = + LoggerFactory.getLogger(QueueConfigurationAutoRefreshPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + private long monitoringInterval; + private long lastModified; + + // Last time we attempt to reload queues + // included successful and failed case. + private long lastReloadAttempt; + private boolean lastReloadAttemptFailed = false; + + // Path to XML file containing allocations. + private Path allocCsFile; + private FileSystem fs; + + /** + * Instantiated by CapacitySchedulerConfiguration. + */ + public QueueConfigurationAutoRefreshPolicy() { + clock = new MonotonicClock(); + } + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Queue auto refresh Policy monitor: {}" + this. + getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_AUTO_REFRESH_MONITORING_INTERVAL, + CapacitySchedulerConfiguration. + DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL); + } + + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + try { + + // Support both FileSystemBased and LocalFile based + if (rmContext.getYarnConfiguration(). + get(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS). + equals(FileSystemBasedConfigurationProvider + .class.getCanonicalName())) { + allocCsFile = new Path(rmContext.getYarnConfiguration(). + get(YarnConfiguration.FS_BASED_RM_CONF_STORE), + YarnConfiguration.CS_CONFIGURATION_FILE); + } else { + allocCsFile = new Path(rmContext.getYarnConfiguration() + .getClassLoader().getResource("").toString(), + YarnConfiguration.CS_CONFIGURATION_FILE); + } + + // Check if the cs related conf modified + fs = allocCsFile.getFileSystem(rmContext.getYarnConfiguration()); + + lastModified = + fs.getFileStatus(allocCsFile).getModificationTime(); + + long time = clock.getTime(); + + if (lastModified > lastReloadAttempt && + time > lastReloadAttempt + monitoringInterval) { + try { + rmContext.getRMAdminService().refreshQueues(); + LOG.info("Queue auto refresh completed successfully"); + lastReloadAttempt = clock.getTime(); + } catch (IOException | YarnException e) { + LOG.error("Can't refresh queue: " + e); + if (!lastReloadAttemptFailed) { + LOG.error("Failed to reload capacity scheduler config file - " + + "will use existing conf.", e.getMessage()); + } + lastReloadAttempt = clock.getTime(); + lastReloadAttemptFailed = true; + } + + } else if (lastModified == 0L) { + if (!lastReloadAttemptFailed) { + LOG.warn("Failed to reload capacity scheduler config file because" + + " last modified returned 0. File exists: " + + fs.exists(allocCsFile)); + } + lastReloadAttemptFailed = true; + } + + } catch (IOException e) { + LOG.error("Can't get file status for refresh : " + e); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + @VisibleForTesting + long getLastReloadAttempt() { + return lastReloadAttempt; + } + + @VisibleForTesting + long getLastModified() { + return lastModified; + } + + @VisibleForTesting + Clock getClock() { + return clock; + } + + @VisibleForTesting + boolean getLastReloadAttemptFailed() { + return lastReloadAttemptFailed; + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return QueueConfigurationAutoRefreshPolicy.class.getCanonicalName(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueConfigurationAutoRefreshPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueConfigurationAutoRefreshPolicy.java new file mode 100644 index 00000000000..f4a5a2103d6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueConfigurationAutoRefreshPolicy.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider; +import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +public class TestQueueConfigurationAutoRefreshPolicy { + + private Configuration configuration; + private MockRM rm = null; + private FileSystem fs; + private Path workingPath; + private Path workingPathRecover; + private Path fileSystemWorkingPath; + private Path tmpDir; + private QueueConfigurationAutoRefreshPolicy policy; + + static { + YarnConfiguration.addDefaultResource( + YarnConfiguration.CS_CONFIGURATION_FILE); + YarnConfiguration.addDefaultResource( + YarnConfiguration.DR_CONFIGURATION_FILE); + } + + @Before + public void setup() throws IOException { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.setMiniClusterMode(true); + + configuration = new YarnConfiguration(); + configuration.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getCanonicalName()); + fs = FileSystem.get(configuration); + workingPath = new Path(QueueConfigurationAutoRefreshPolicy. + class.getClassLoader(). + getResource(".").toString()); + workingPathRecover = new Path(QueueConfigurationAutoRefreshPolicy. + class.getClassLoader(). + getResource(".").toString() + "/" + "Recover"); + fileSystemWorkingPath = + new Path(new File("target", this.getClass().getSimpleName() + + "-remoteDir").getAbsolutePath()); + + tmpDir = new Path(new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsolutePath()); + fs.delete(fileSystemWorkingPath, true); + fs.mkdirs(fileSystemWorkingPath); + fs.delete(tmpDir, true); + fs.mkdirs(tmpDir); + + policy = + new QueueConfigurationAutoRefreshPolicy(); + } + + private String writeConfigurationXML(Configuration conf, String confXMLName) + throws IOException { + DataOutputStream output = null; + try { + final File confFile = new File(tmpDir.toString(), confXMLName); + if (confFile.exists()) { + confFile.delete(); + } + if (!confFile.createNewFile()) { + Assert.fail("Can not create " + confXMLName); + } + output = new DataOutputStream( + new FileOutputStream(confFile)); + conf.writeXml(output); + return confFile.getAbsolutePath(); + } finally { + if (output != null) { + output.close(); + } + } + } + + private void uploadConfiguration(Boolean isFileSystemBased, + Configuration conf, String confFileName) + throws IOException { + String csConfFile = writeConfigurationXML(conf, confFileName); + if (isFileSystemBased) { + // upload the file into Remote File System + uploadToRemoteFileSystem(new Path(csConfFile), + fileSystemWorkingPath); + } else { + // upload the file into Work Path for Local File + uploadToRemoteFileSystem(new Path(csConfFile), + workingPath); + } + } + + private void uploadToRemoteFileSystem(Path filePath, Path remotePath) + throws IOException { + fs.copyFromLocalFile(filePath, remotePath); + } + + private void uploadDefaultConfiguration(Boolean + isFileSystemBased) throws IOException { + Configuration conf = new Configuration(); + uploadConfiguration(isFileSystemBased, + conf, "core-site.xml"); + + YarnConfiguration yarnConf = new YarnConfiguration(); + + uploadConfiguration(isFileSystemBased, + yarnConf, "yarn-site.xml"); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + uploadConfiguration(isFileSystemBased, + csConf, "capacity-scheduler.xml"); + + Configuration hadoopPolicyConf = new Configuration(false); + hadoopPolicyConf + .addResource(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); + uploadConfiguration(isFileSystemBased, + hadoopPolicyConf, "hadoop-policy.xml"); + } + + @Test + public void testFileSystemBasedEditSchedule() throws Exception { + // Test FileSystemBasedConfigurationProvider scheduled + testCommon(true); + } + + @Test + public void testLocalFileBasedEditSchedule() throws Exception { + // Prepare for recover for local file default. + fs.mkdirs(workingPath); + fs.copyFromLocalFile(new Path(workingPath.toString() + + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE), + new Path(workingPathRecover.toString() + + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)); + + fs.copyFromLocalFile(new Path(workingPath.toString() + + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE), + new Path(workingPathRecover.toString() + + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)); + + fs.copyFromLocalFile(new Path(workingPath.toString() + + "/" + YarnConfiguration.CS_CONFIGURATION_FILE), + new Path(workingPathRecover.toString() + + "/" + YarnConfiguration.CS_CONFIGURATION_FILE)); + + // Test LocalConfigurationProvider scheduled + testCommon(false); + + // Recover for recover for local file default. + fs.copyFromLocalFile(new Path(workingPathRecover.toString() + + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE), + new Path(workingPath.toString() + + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)); + + fs.copyFromLocalFile(new Path(workingPathRecover.toString() + + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE), + new Path(workingPath.toString() + + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)); + + fs.copyFromLocalFile(new Path(workingPathRecover.toString() + + "/" + YarnConfiguration.CS_CONFIGURATION_FILE), + new Path(workingPath.toString() + + "/" + YarnConfiguration.CS_CONFIGURATION_FILE)); + + fs.delete(workingPathRecover, true); + } + + public void testCommon(Boolean isFileSystemBased) throws Exception { + + // Set auto refresh interval to 1s + configuration.setLong(CapacitySchedulerConfiguration. + QUEUE_AUTO_REFRESH_MONITORING_INTERVAL, + 1000L); + + if (isFileSystemBased) { + configuration.set(YarnConfiguration.FS_BASED_RM_CONF_STORE, + fileSystemWorkingPath.toString()); + } + + //upload default configurations + uploadDefaultConfiguration(isFileSystemBased); + + if (isFileSystemBased) { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + FileSystemBasedConfigurationProvider.class.getCanonicalName()); + } else { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + LocalConfigurationProvider.class.getCanonicalName()); + } + + // upload the auto refresh related configurations + uploadConfiguration(isFileSystemBased, + configuration, "yarn-site.xml"); + uploadConfiguration(isFileSystemBased, + configuration, "capacity-scheduler.xml"); + + rm = new MockRM(configuration); + rm.init(configuration); + policy.init(configuration, + rm.getRMContext(), + rm.getResourceScheduler()); + rm.start(); + + CapacityScheduler cs = + (CapacityScheduler) rm.getRMContext().getScheduler(); + + int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setInt(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, + 5000); + uploadConfiguration(isFileSystemBased, + csConf, "capacity-scheduler.xml"); + + // Refreshed first time. + policy.editSchedule(); + + // Make sure refresh successfully. + Assert.assertFalse(policy.getLastReloadAttemptFailed()); + long oldModified = policy.getLastModified(); + long oldSuccess = policy.getLastReloadAttempt(); + + Assert.assertTrue(oldSuccess > oldModified); + + int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsAfter, 5000); + Assert.assertTrue(maxAppsAfter != maxAppsBefore); + + // Trigger interval for refresh. + GenericTestUtils.waitFor(() -> (policy.getClock().getTime() - + policy.getLastReloadAttempt()) / 1000 > 1, + 500, 3000); + + // Upload for modified. + csConf.setInt(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, + 3000); + uploadConfiguration(isFileSystemBased, + csConf, "capacity-scheduler.xml"); + + policy.editSchedule(); + // Wait for triggered refresh. + GenericTestUtils.waitFor(() -> policy.getLastReloadAttempt() > + policy.getLastModified(), + 500, 3000); + + // Make sure refresh successfully. + Assert.assertFalse(policy.getLastReloadAttemptFailed()); + oldModified = policy.getLastModified(); + oldSuccess = policy.getLastReloadAttempt(); + Assert.assertTrue(oldSuccess > oldModified); + Assert.assertEquals(cs.getConfiguration(). + getMaximumSystemApplications(), 3000); + + // Trigger interval for refresh. + GenericTestUtils.waitFor(() -> (policy.getClock().getTime() - + policy.getLastReloadAttempt()) / 1000 > 1, + 500, 3000); + + // Without modified + policy.editSchedule(); + Assert.assertEquals(oldModified, + policy.getLastModified()); + Assert.assertEquals(oldSuccess, + policy.getLastReloadAttempt()); + } + + @After + public void tearDown() throws IOException { + if (rm != null) { + rm.stop(); + } + fs.delete(fileSystemWorkingPath, true); + fs.delete(tmpDir, true); + } +}