YARN-2252. Intermittent failure of TestFairScheduler.testContinuousScheduling. (Ratandeep Ratti and kasha via kasha)
(cherry picked from commit f5578207d2
)
This commit is contained in:
parent
8ba8521de5
commit
024bcfdc0d
|
@ -397,6 +397,10 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2584. TestContainerManagerSecurity fails on trunk. (Jian He via
|
YARN-2584. TestContainerManagerSecurity fails on trunk. (Jian He via
|
||||||
junping_du)
|
junping_du)
|
||||||
|
|
||||||
|
YARN-2252. Intermittent failure of
|
||||||
|
TestFairScheduler.testContinuousScheduling.
|
||||||
|
(Ratandeep Ratti and kasha via kasha)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -131,8 +131,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
|
if (scheduler != null) {
|
||||||
|
scheduler.stop();
|
||||||
scheduler = null;
|
scheduler = null;
|
||||||
|
}
|
||||||
|
if (resourceManager != null) {
|
||||||
|
resourceManager.stop();
|
||||||
resourceManager = null;
|
resourceManager = null;
|
||||||
|
}
|
||||||
QueueMetrics.clearQueueMetrics();
|
QueueMetrics.clearQueueMetrics();
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -140,7 +146,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testConfValidation() throws Exception {
|
public void testConfValidation() throws Exception {
|
||||||
FairScheduler scheduler = new FairScheduler();
|
scheduler = new FairScheduler();
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
|
||||||
|
@ -212,7 +218,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNonMinZeroResourcesSettings() throws IOException {
|
public void testNonMinZeroResourcesSettings() throws IOException {
|
||||||
FairScheduler fs = new FairScheduler();
|
scheduler = new FairScheduler();
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256);
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
|
||||||
|
@ -220,17 +226,17 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
||||||
fs.init(conf);
|
scheduler.init(conf);
|
||||||
fs.reinitialize(conf, null);
|
scheduler.reinitialize(conf, null);
|
||||||
Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
|
Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemory());
|
||||||
Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
|
Assert.assertEquals(1, scheduler.getMinimumResourceCapability().getVirtualCores());
|
||||||
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
|
Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
|
||||||
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
|
Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMinZeroResourcesSettings() throws IOException {
|
public void testMinZeroResourcesSettings() throws IOException {
|
||||||
FairScheduler fs = new FairScheduler();
|
scheduler = new FairScheduler();
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0);
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0);
|
||||||
|
@ -238,12 +244,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
|
||||||
fs.init(conf);
|
scheduler.init(conf);
|
||||||
fs.reinitialize(conf, null);
|
scheduler.reinitialize(conf, null);
|
||||||
Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
|
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemory());
|
||||||
Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
|
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores());
|
||||||
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
|
Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
|
||||||
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
|
Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -3293,49 +3299,49 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
@Test (timeout = 10000)
|
@Test (timeout = 10000)
|
||||||
public void testContinuousScheduling() throws Exception {
|
public void testContinuousScheduling() throws Exception {
|
||||||
// set continuous scheduling enabled
|
// set continuous scheduling enabled
|
||||||
FairScheduler fs = new FairScheduler();
|
scheduler = new FairScheduler();
|
||||||
Configuration conf = createConfiguration();
|
Configuration conf = createConfiguration();
|
||||||
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
|
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
|
||||||
true);
|
true);
|
||||||
fs.setRMContext(resourceManager.getRMContext());
|
scheduler.setRMContext(resourceManager.getRMContext());
|
||||||
fs.init(conf);
|
scheduler.init(conf);
|
||||||
fs.start();
|
scheduler.start();
|
||||||
fs.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
Assert.assertTrue("Continuous scheduling should be enabled.",
|
Assert.assertTrue("Continuous scheduling should be enabled.",
|
||||||
fs.isContinuousSchedulingEnabled());
|
scheduler.isContinuousSchedulingEnabled());
|
||||||
|
|
||||||
// Add two nodes
|
// Add two nodes
|
||||||
RMNode node1 =
|
RMNode node1 =
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||||
"127.0.0.1");
|
"127.0.0.1");
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
fs.handle(nodeEvent1);
|
scheduler.handle(nodeEvent1);
|
||||||
RMNode node2 =
|
RMNode node2 =
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||||
"127.0.0.2");
|
"127.0.0.2");
|
||||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
fs.handle(nodeEvent2);
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
// available resource
|
// available resource
|
||||||
Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024);
|
Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024);
|
||||||
Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16);
|
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
|
||||||
|
|
||||||
// send application request
|
// send application request
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
|
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
|
||||||
fs.addApplicationAttempt(appAttemptId, false, false);
|
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
ResourceRequest request =
|
ResourceRequest request =
|
||||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||||
ask.add(request);
|
ask.add(request);
|
||||||
fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
|
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
|
||||||
|
|
||||||
// waiting for continuous_scheduler_sleep_time
|
// waiting for continuous_scheduler_sleep_time
|
||||||
// at least one pass
|
// at least one pass
|
||||||
Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
|
Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500);
|
||||||
|
|
||||||
FSAppAttempt app = fs.getSchedulerApp(appAttemptId);
|
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||||
// Wait until app gets resources.
|
// Wait until app gets resources.
|
||||||
while (app.getCurrentConsumption().equals(Resources.none())) { }
|
while (app.getCurrentConsumption().equals(Resources.none())) { }
|
||||||
|
|
||||||
|
@ -3348,7 +3354,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
||||||
ask.clear();
|
ask.clear();
|
||||||
ask.add(request);
|
ask.add(request);
|
||||||
fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
|
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
|
||||||
|
|
||||||
// Wait until app gets resources
|
// Wait until app gets resources
|
||||||
while (app.getCurrentConsumption()
|
while (app.getCurrentConsumption()
|
||||||
|
|
Loading…
Reference in New Issue