ARN-803. factor out scheduler config validation from the ResourceManager to each scheduler implementation. (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493160 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-06-14 17:07:25 +00:00
parent 0ef156a86e
commit 334de8d211
9 changed files with 207 additions and 68 deletions

View File

@ -343,6 +343,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-782. vcores-pcores ratio functions differently from vmem-pmem ratio in
misleading way. (sandyr via tucu)
YARN-803. factor out scheduler config validation from the ResourceManager
to each scheduler implementation. (tucu)
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -350,42 +350,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
+ ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
+ "=" + globalMaxAppAttempts + ", it should be a positive integer.");
}
// validate scheduler memory allocation setting
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
// validate scheduler vcores allocation setting
int minVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
if (minVcores <= 0 || minVcores > maxVcores) {
throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ "=" + minVcores
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ "=" + maxVcores + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
@Private

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@ -117,6 +119,44 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
yarnConf = conf;
}
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
// validate scheduler vcores allocation setting
int minVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
if (minVcores <= 0 || minVcores > maxVcores) {
throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ "=" + minVcores
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ "=" + maxVcores + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
@Override
public Configuration getConf() {
return yarnConf;
@ -211,7 +251,7 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
if (!initialized) {
this.conf = new CapacitySchedulerConfiguration(conf);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
this.calculator = this.conf.getResourceCalculator();
@ -229,6 +269,7 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = new CapacitySchedulerConfiguration(conf);
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
reinitializeQueues(this.conf);

View File

@ -35,10 +35,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -109,7 +111,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
@Unstable
@SuppressWarnings("unchecked")
public class FairScheduler implements ResourceScheduler {
private boolean initialized;
private FairSchedulerConfiguration conf;
private RMContext rmContext;
@ -185,6 +186,44 @@ public class FairScheduler implements ResourceScheduler {
queueMgr = new QueueManager(this);
}
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem < 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min should equal greater than 0"
+ ", max should be no smaller than min.");
}
// validate scheduler vcores allocation setting
int minVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
if (minVcores < 0 || minVcores > maxVcores) {
throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ "=" + minVcores
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ "=" + maxVcores + ", min should equal greater than 0"
+ ", max should be no smaller than min.");
}
}
public FairSchedulerConfiguration getConf() {
return conf;
}
@ -986,6 +1025,7 @@ public class FairScheduler implements ResourceScheduler {
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -175,6 +176,26 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
this.conf = conf;
}
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
@Override
public synchronized Configuration getConf() {
return conf;
@ -201,6 +222,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
{
setConf(conf);
if (!this.initialized) {
validateConf(conf);
this.rmContext = rmContext;
this.minimumAllocation =
Resources.createResource(conf.getInt(

View File

@ -25,7 +25,9 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -41,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -55,6 +58,9 @@ import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@ -68,6 +74,24 @@ public class TestFifoScheduler {
FifoScheduler.class, ResourceScheduler.class);
}
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new FifoScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
scheduler.reinitialize(conf, null);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid resource scheduler memory"));
}
}
@Test
public void test() throws Exception {
Logger rootLogger = LogManager.getRootLogger();

View File

@ -201,34 +201,6 @@ public class TestResourceManager {
if (!e.getMessage().startsWith(
"Invalid global max attempts configuration")) throw e;
}
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
resourceManager = new ResourceManager();
try {
resourceManager.init(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
// Exception is expected.
if (!e.getMessage().startsWith(
"Invalid resource scheduler memory")) throw e;
}
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
resourceManager = new ResourceManager();
try {
resourceManager.init(conf);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
// Exception is expected.
if (!e.getMessage().startsWith(
"Invalid resource scheduler vcores")) throw e;
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
@ -30,7 +31,10 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -99,6 +103,39 @@ public class TestCapacityScheduler {
resourceManager.stop();
}
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new CapacityScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
scheduler.reinitialize(conf, null);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid resource scheduler memory"));
}
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
scheduler.reinitialize(conf, null);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid resource scheduler vcores"));
}
}
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability)

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
@ -38,11 +39,13 @@ import javax.xml.parsers.ParserConfigurationException;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -137,6 +140,39 @@ public class TestFairScheduler {
DefaultMetricsSystem.shutdown();
}
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new FairScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
scheduler.reinitialize(conf, null);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid resource scheduler memory"));
}
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
scheduler.reinitialize(conf, null);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid resource scheduler vcores"));
}
}
private Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,