diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4f9c89ce1e5..63c34d429ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -108,6 +107,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; @@ -168,6 +169,8 @@ public class CapacityScheduler extends private int maxAssignPerHeartbeat; + private CSConfigurationProvider csConfProvider; + @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -290,7 +293,18 @@ public class CapacityScheduler extends IOException { try { writeLock.lock(); - this.conf = loadCapacitySchedulerConfiguration(configuration); + String confProviderStr = configuration.get( + CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.DEFAULT_CS_CONF_PROVIDER); + if (confProviderStr.equals( + CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER)) { + this.csConfProvider = new FileBasedCSConfigurationProvider(rmContext); + } else { + throw new IOException("Invalid CS configuration provider: " + + confProviderStr); + } + this.csConfProvider.init(configuration); + this.conf = this.csConfProvider.loadConfiguration(configuration); validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation()); @@ -400,7 +414,7 @@ public class CapacityScheduler extends writeLock.lock(); Configuration configuration = new Configuration(newConf); CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = loadCapacitySchedulerConfiguration(configuration); + this.conf = csConfProvider.loadConfiguration(configuration); validateConf(this.conf); try { LOG.info("Re-initializing queues..."); @@ -1832,23 +1846,6 @@ public class CapacityScheduler extends return true; } - private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( - Configuration configuration) throws IOException { - try { - InputStream CSInputStream = - this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(configuration, - YarnConfiguration.CS_CONFIGURATION_FILE); - if (CSInputStream != null) { - configuration.addResource(CSInputStream); - return new CapacitySchedulerConfiguration(configuration, false); - } - return new CapacitySchedulerConfiguration(configuration, true); - } catch (Exception e) { - throw new IOException(e); - } - } - private String getDefaultReservationQueueName(String planQueueName) { return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3a519ecf5f1..3821e24a74e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -315,6 +315,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1; + + public static final String CS_CONF_PROVIDER = PREFIX + + "configuration.provider"; + + @Private + public static final String FILE_CS_CONF_PROVIDER = "file"; + + @Private + public static final String DEFAULT_CS_CONF_PROVIDER = FILE_CS_CONF_PROVIDER; AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java new file mode 100644 index 00000000000..c9984ac7378 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.io.IOException; + +/** + * Configuration provider for {@link CapacityScheduler}. + */ +public interface CSConfigurationProvider { + + /** + * Initialize the configuration provider with given conf. + * @param conf configuration to initialize with + */ + void init(Configuration conf); + + /** + * Loads capacity scheduler configuration object. + * @param conf initial bootstrap configuration + * @return CS configuration + * @throws IOException if fail to retrieve configuration + */ + CapacitySchedulerConfiguration loadConfiguration(Configuration conf) + throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FileBasedCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FileBasedCSConfigurationProvider.java new file mode 100644 index 00000000000..51c64fac45a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FileBasedCSConfigurationProvider.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.io.IOException; +import java.io.InputStream; + +/** + * {@link CapacityScheduler} configuration provider based on local + * {@code capacity-scheduler.xml} file. + */ +public class FileBasedCSConfigurationProvider implements + CSConfigurationProvider { + + private RMContext rmContext; + + /** + * Construct file based CS configuration provider with given context. + * @param rmContext the RM context + */ + public FileBasedCSConfigurationProvider(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void init(Configuration conf) {} + + @Override + public CapacitySchedulerConfiguration loadConfiguration(Configuration conf) + throws IOException { + try { + InputStream csInputStream = + this.rmContext.getConfigurationProvider() + .getConfigurationInputStream(conf, + YarnConfiguration.CS_CONFIGURATION_FILE); + if (csInputStream != null) { + conf.addResource(csInputStream); + return new CapacitySchedulerConfiguration(conf, false); + } + return new CapacitySchedulerConfiguration(conf, true); + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/package-info.java new file mode 100644 index 00000000000..08d0522d318 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package + * org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf + * contains classes related to capacity scheduler configuration management. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 74e3a6af45c..89e622992fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -246,13 +246,13 @@ public class TestCapacityScheduler { @Test (timeout = 30000) public void testConfValidation() throws Exception { - ResourceScheduler scheduler = new CapacityScheduler(); + CapacityScheduler scheduler = new CapacityScheduler(); scheduler.setRMContext(resourceManager.getRMContext()); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, mockContext); + scheduler.init(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) {