YARN-5951. Changes to allow CapacityScheduler to use configuration store
This commit is contained in:
parent
4d377c89bb
commit
cd4bc54e6c
|
@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -108,6 +107,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
||||||
|
@ -168,6 +169,8 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
private int maxAssignPerHeartbeat;
|
private int maxAssignPerHeartbeat;
|
||||||
|
|
||||||
|
private CSConfigurationProvider csConfProvider;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(Configuration conf) {
|
||||||
yarnConf = conf;
|
yarnConf = conf;
|
||||||
|
@ -290,7 +293,18 @@ public class CapacityScheduler extends
|
||||||
IOException {
|
IOException {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
this.conf = loadCapacitySchedulerConfiguration(configuration);
|
String confProviderStr = configuration.get(
|
||||||
|
CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
|
||||||
|
CapacitySchedulerConfiguration.DEFAULT_CS_CONF_PROVIDER);
|
||||||
|
if (confProviderStr.equals(
|
||||||
|
CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER)) {
|
||||||
|
this.csConfProvider = new FileBasedCSConfigurationProvider(rmContext);
|
||||||
|
} else {
|
||||||
|
throw new IOException("Invalid CS configuration provider: " +
|
||||||
|
confProviderStr);
|
||||||
|
}
|
||||||
|
this.csConfProvider.init(configuration);
|
||||||
|
this.conf = this.csConfProvider.loadConfiguration(configuration);
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
this.minimumAllocation = this.conf.getMinimumAllocation();
|
this.minimumAllocation = this.conf.getMinimumAllocation();
|
||||||
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
initMaximumResourceCapability(this.conf.getMaximumAllocation());
|
||||||
|
@ -400,7 +414,7 @@ public class CapacityScheduler extends
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
Configuration configuration = new Configuration(newConf);
|
Configuration configuration = new Configuration(newConf);
|
||||||
CapacitySchedulerConfiguration oldConf = this.conf;
|
CapacitySchedulerConfiguration oldConf = this.conf;
|
||||||
this.conf = loadCapacitySchedulerConfiguration(configuration);
|
this.conf = csConfProvider.loadConfiguration(configuration);
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
try {
|
try {
|
||||||
LOG.info("Re-initializing queues...");
|
LOG.info("Re-initializing queues...");
|
||||||
|
@ -1832,23 +1846,6 @@ public class CapacityScheduler extends
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration(
|
|
||||||
Configuration configuration) throws IOException {
|
|
||||||
try {
|
|
||||||
InputStream CSInputStream =
|
|
||||||
this.rmContext.getConfigurationProvider()
|
|
||||||
.getConfigurationInputStream(configuration,
|
|
||||||
YarnConfiguration.CS_CONFIGURATION_FILE);
|
|
||||||
if (CSInputStream != null) {
|
|
||||||
configuration.addResource(CSInputStream);
|
|
||||||
return new CapacitySchedulerConfiguration(configuration, false);
|
|
||||||
}
|
|
||||||
return new CapacitySchedulerConfiguration(configuration, true);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getDefaultReservationQueueName(String planQueueName) {
|
private String getDefaultReservationQueueName(String planQueueName) {
|
||||||
return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
|
return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
|
||||||
}
|
}
|
||||||
|
|
|
@ -316,6 +316,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
@Private
|
@Private
|
||||||
public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
|
public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
|
||||||
|
|
||||||
|
public static final String CS_CONF_PROVIDER = PREFIX
|
||||||
|
+ "configuration.provider";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String FILE_CS_CONF_PROVIDER = "file";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String DEFAULT_CS_CONF_PROVIDER = FILE_CS_CONF_PROVIDER;
|
||||||
|
|
||||||
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
|
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
|
||||||
|
|
||||||
public CapacitySchedulerConfiguration() {
|
public CapacitySchedulerConfiguration() {
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration provider for {@link CapacityScheduler}.
|
||||||
|
*/
|
||||||
|
public interface CSConfigurationProvider {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the configuration provider with given conf.
|
||||||
|
* @param conf configuration to initialize with
|
||||||
|
*/
|
||||||
|
void init(Configuration conf);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Loads capacity scheduler configuration object.
|
||||||
|
* @param conf initial bootstrap configuration
|
||||||
|
* @return CS configuration
|
||||||
|
* @throws IOException if fail to retrieve configuration
|
||||||
|
*/
|
||||||
|
CapacitySchedulerConfiguration loadConfiguration(Configuration conf)
|
||||||
|
throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link CapacityScheduler} configuration provider based on local
|
||||||
|
* {@code capacity-scheduler.xml} file.
|
||||||
|
*/
|
||||||
|
public class FileBasedCSConfigurationProvider implements
|
||||||
|
CSConfigurationProvider {
|
||||||
|
|
||||||
|
private RMContext rmContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct file based CS configuration provider with given context.
|
||||||
|
* @param rmContext the RM context
|
||||||
|
*/
|
||||||
|
public FileBasedCSConfigurationProvider(RMContext rmContext) {
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Configuration conf) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CapacitySchedulerConfiguration loadConfiguration(Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
InputStream csInputStream =
|
||||||
|
this.rmContext.getConfigurationProvider()
|
||||||
|
.getConfigurationInputStream(conf,
|
||||||
|
YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||||
|
if (csInputStream != null) {
|
||||||
|
conf.addResource(csInputStream);
|
||||||
|
return new CapacitySchedulerConfiguration(conf, false);
|
||||||
|
}
|
||||||
|
return new CapacitySchedulerConfiguration(conf, true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Package
|
||||||
|
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf
|
||||||
|
* contains classes related to capacity scheduler configuration management.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -246,13 +246,13 @@ public class TestCapacityScheduler {
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testConfValidation() throws Exception {
|
public void testConfValidation() throws Exception {
|
||||||
ResourceScheduler scheduler = new CapacityScheduler();
|
CapacityScheduler scheduler = new CapacityScheduler();
|
||||||
scheduler.setRMContext(resourceManager.getRMContext());
|
scheduler.setRMContext(resourceManager.getRMContext());
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
|
||||||
try {
|
try {
|
||||||
scheduler.reinitialize(conf, mockContext);
|
scheduler.init(conf);
|
||||||
fail("Exception is expected because the min memory allocation is" +
|
fail("Exception is expected because the min memory allocation is" +
|
||||||
" larger than the max memory allocation.");
|
" larger than the max memory allocation.");
|
||||||
} catch (YarnRuntimeException e) {
|
} catch (YarnRuntimeException e) {
|
||||||
|
|
Loading…
Reference in New Issue