Merge -c 1391929 from trunk to branch-2 to fix YARN-137. Change the default YARN scheduler to be the CapacityScheduler. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1391930 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-09-30 00:44:15 +00:00
parent 060e4f256d
commit 7418b5f357
17 changed files with 99 additions and 29 deletions

View File

@ -48,6 +48,9 @@ Release 2.0.2-alpha - 2012-09-07
YARN-80. Add support for delaying rack-local containers in
CapacityScheduler. (acmurthy)
YARN-137. Change the default YARN scheduler to be the CapacityScheduler.
(sseth via acmurthy)
OPTIMIZATIONS
BUG FIXES

View File

@ -64,4 +64,8 @@ public abstract class Priority implements Comparable<Priority> {
return this.getPriority() - other.getPriority();
}
@Override
public String toString() {
return "{Priority: " + getPriority() + "}";
}
}

View File

@ -163,4 +163,9 @@ public class ResourceRequestPBImpl extends ResourceRequest {
return ((ResourcePBImpl)t).getProto();
}
@Override
public String toString() {
return "{Priority: " + getPriority() + ", Capability: " + getCapability()
+ "}";
}
}

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -41,7 +43,7 @@ public class TestDistributedShell {
LogFactory.getLog(TestDistributedShell.class);
protected static MiniYARNCluster yarnCluster = null;
protected static Configuration conf = new Configuration();
protected static Configuration conf = new YarnConfiguration();
protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class);
@ -49,6 +51,8 @@ public class TestDistributedShell {
public static void setup() throws InterruptedException, IOException {
LOG.info("Starting up YARN cluster");
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName(),
1, 1, 1);

View File

@ -208,6 +208,8 @@ public class YarnConfiguration extends Configuration {
public static final String RM_SCHEDULER =
RM_PREFIX + "scheduler.class";
public static final String DEFAULT_RM_SCHEDULER =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
//Delegation token related keys
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =

View File

@ -185,6 +185,7 @@
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
<property>

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -256,10 +255,22 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected ResourceScheduler createScheduler() {
return ReflectionUtils.newInstance(this.conf.getClass(
YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class), this.conf);
}
String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
YarnConfiguration.DEFAULT_RM_SCHEDULER);
LOG.info("Using Scheduler: " + schedulerClassName);
try {
Class<?> schedulerClazz = Class.forName(schedulerClassName);
if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
this.conf);
} else {
throw new YarnException("Class: " + schedulerClassName
+ " not instance of " + ResourceScheduler.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnException("Could not instantiate Scheduler: "
+ schedulerClassName, e);
} }
protected ApplicationMasterLauncher createAMLauncher() {
return new ApplicationMasterLauncher(this.clientToAMSecretManager,

View File

@ -112,7 +112,7 @@ public class MockAM {
ResourceRequest hostReq = createResourceReq(host, memory, priority,
containers);
reqs.add(hostReq);
ResourceRequest rackReq = createResourceReq("default-rack", memory,
ResourceRequest rackReq = createResourceReq("/default-rack", memory,
priority, containers);
reqs.add(rackReq);
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
@ -58,12 +59,12 @@ import org.apache.log4j.Logger;
public class MockRM extends ResourceManager {
public MockRM() {
this(new Configuration());
this(new YarnConfiguration());
}
public MockRM(Configuration conf) {
super(StoreFactory.getStore(conf));
init(conf);
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
}

View File

@ -52,18 +52,27 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
private final int GB = 1024;
private static YarnConfiguration conf;
@BeforeClass
public static void setup() {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
}
@Test
public void test() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
MockNM nm2 = rm.registerNode("h2:5678", 4 * GB);
@ -178,7 +187,7 @@ public class TestFifoScheduler {
public void testDefaultMinimumAllocation() throws Exception {
// Test with something lesser than default
testMinimumAllocation(
new YarnConfiguration(),
new YarnConfiguration(TestFifoScheduler.conf),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB / 2);
}
@ -186,7 +195,7 @@ public class TestFifoScheduler {
public void testNonDefaultMinimumAllocation() throws Exception {
// Set custom min-alloc to test tweaking it
int allocMB = 512;
YarnConfiguration conf = new YarnConfiguration();
YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, allocMB);
// Test for something lesser than this.
testMinimumAllocation(conf, allocMB / 2);

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
@ -45,7 +46,7 @@ public class TestResourceManager {
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
Configuration conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager.init(conf);

View File

@ -342,10 +342,12 @@ public class TestResourceTrackerService {
MockNM nm2 = rm.registerNode("host2:5678", 5120);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
dispatcher.await();
checkUnealthyNMCount(rm, nm2, true, 1);
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
// TODO Metrics incorrect in case of the FifoScheduler
Assert.assertEquals(5120, metrics.getAvailableMB());
// reconnect of healthy node
nm1 = rm.registerNode("host1:1234", 5120);

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -93,7 +94,7 @@ public class TestFairScheduler {
@Before
public void setUp() throws IOException {
scheduler = new FairScheduler();
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
Store store = StoreFactory.getStore(conf);
@ -109,6 +110,13 @@ public class TestFairScheduler {
resourceManager = null;
}
private Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
return conf;
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
@ -278,7 +286,7 @@ public class TestFairScheduler {
@Test
public void testUserAsDefaultQueue() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.reinitialize(conf, resourceManager.getRMContext());
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
@ -299,7 +307,7 @@ public class TestFairScheduler {
@Test
public void testFairShareWithMinAlloc() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -405,7 +413,7 @@ public class TestFairScheduler {
@Test
public void testAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -508,7 +516,7 @@ public class TestFairScheduler {
@Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -611,7 +619,7 @@ public class TestFairScheduler {
@Test
public void testIsStarvedForMinShare() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -670,7 +678,7 @@ public class TestFairScheduler {
@Test
public void testIsStarvedForFairShare() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -734,7 +742,7 @@ public class TestFairScheduler {
* now this means decreasing order of priority.
*/
public void testChoiceOfPreemptedContainers() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -867,7 +875,7 @@ public class TestFairScheduler {
* Tests the timing of decision to preempt tasks.
*/
public void testPreemptionDecision() throws Exception {
Configuration conf = new Configuration();
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -59,7 +61,10 @@ public class TestFifoScheduler {
public void setUp() throws Exception {
Store store = StoreFactory.getStore(new Configuration());
resourceManager = new ResourceManager(store);
resourceManager.init(new Configuration());
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
resourceManager.init(conf);
}
@After

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
@ -74,7 +75,10 @@ public class TestRMWebServices extends JerseyTest {
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
rm = new MockRM(new Configuration());
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
bind(RMContext.class).toInstance(rm.getRMContext());
bind(ApplicationACLsManager.class).toInstance(

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
@ -82,6 +84,8 @@ public class TestRMWebServicesApps extends JerseyTest {
bind(GenericExceptionHandler.class);
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 2);
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
bind(RMContext.class).toInstance(rm.getRMContext());

View File

@ -103,6 +103,12 @@ public class MiniYARNCluster extends CompositeService {
}
}
@Override
public void init(Configuration conf) {
super.init(conf instanceof YarnConfiguration ? conf
: new YarnConfiguration(conf));
}
public File getTestWorkDir() {
return testWorkDir;
}
@ -201,7 +207,7 @@ public class MiniYARNCluster extends CompositeService {
}
public synchronized void init(Configuration conf) {
Configuration config = new Configuration(conf);
Configuration config = new YarnConfiguration(conf);
super.init(config);
}