MAPREDUCE-3765. FifoScheduler does not respect yarn.scheduler.fifo.minimum-allocation-mb setting (Hitesh Shah via mahadev) - Merging r1240634 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1240635 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2012-02-04 22:51:10 +00:00
parent aaa17aa6ab
commit 4eb850ceb7
4 changed files with 53 additions and 21 deletions

View File

@ -657,6 +657,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3775. Change MiniYarnCluster to escape special chars in testname. MAPREDUCE-3775. Change MiniYarnCluster to escape special chars in testname.
(Hitesh Shah via mahadev) (Hitesh Shah via mahadev)
MAPREDUCE-3765. FifoScheduler does not respect yarn.scheduler.fifo.minimum-
allocation-mb setting (Hitesh Shah via mahadev)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -161,6 +161,12 @@
</Or> </Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<!-- Inconsistent sync warning - minimumAllocation is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler" />
<Field name="minimumAllocation" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Don't care if putIfAbsent value is ignored --> <!-- Don't care if putIfAbsent value is ignored -->
<Match> <Match>

View File

@ -230,7 +230,7 @@ public Allocation allocate(
} }
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests(ask, MINIMUM_MEMORY); SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
// Release containers // Release containers
for (ContainerId releasedContainer : release) { for (ContainerId releasedContainer : release) {
@ -592,7 +592,7 @@ private synchronized void nodeUpdate(RMNode rmNode,
minimumAllocation)) { minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() + LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource()); " available resource = " + node.getAvailableResource());
assignContainers(node); assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "

View File

@ -24,38 +24,22 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestFifoScheduler { public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
private ResourceManager resourceManager = null; private final int GB = 1024;
@Before
public void setUp() throws Exception {
Store store = StoreFactory.getStore(new Configuration());
resourceManager = new ResourceManager(store);
resourceManager.init(new Configuration());
}
@After
public void tearDown() throws Exception {
}
@Test @Test
public void test() throws Exception { public void test() throws Exception {
@ -63,7 +47,6 @@ public void test() throws Exception {
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM(); MockRM rm = new MockRM();
rm.start(); rm.start();
int GB = 1024;
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB); MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
MockNM nm2 = rm.registerNode("h2:5678", 4 * GB); MockNM nm2 = rm.registerNode("h2:5678", 4 * GB);
@ -146,8 +129,48 @@ public void test() throws Exception {
rm.stop(); rm.stop();
} }
private void testMinimumAllocation(YarnConfiguration conf)
throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(256);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
int checkAlloc =
conf.getInt("yarn.scheduler.fifo.minimum-allocation-mb", GB);
Assert.assertEquals(checkAlloc, report_nm1.getUsedResource().getMemory());
rm.stop();
}
@Test
public void testDefaultMinimumAllocation() throws Exception {
testMinimumAllocation(new YarnConfiguration());
}
@Test
public void testNonDefaultMinimumAllocation() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt("yarn.scheduler.fifo.minimum-allocation-mb", 512);
testMinimumAllocation(conf);
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestFifoScheduler t = new TestFifoScheduler(); TestFifoScheduler t = new TestFifoScheduler();
t.test(); t.test();
t.testDefaultMinimumAllocation();
t.testNonDefaultMinimumAllocation();
} }
} }