YARN-4812. TestFairScheduler#testContinuousScheduling fails intermittently. (kasha)
(cherry picked from commit f84af8bd588763c4e99305742d8c86ed596e8359) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
This commit is contained in:
parent
d1ae1fb449
commit
d382a21423
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import org.junit.Assert;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ -50,7 +51,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
public class FairSchedulerTestBase {
|
||||
public final static String TEST_DIR =
|
||||
@ -66,6 +67,8 @@ public class FairSchedulerTestBase {
|
||||
protected FairScheduler scheduler;
|
||||
protected ResourceManager resourceManager;
|
||||
public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
|
||||
private static final int SLEEP_DURATION = 10;
|
||||
private static final int SLEEP_RETRIES = 1000;
|
||||
|
||||
// Helper methods
|
||||
public Configuration createConfiguration() {
|
||||
@ -260,4 +263,21 @@ protected RMApp createMockRMApp(ApplicationAttemptId attemptId) {
|
||||
.put(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
}
|
||||
|
||||
protected void checkAppConsumption(FSAppAttempt app, Resource resource)
|
||||
throws InterruptedException {
|
||||
for (int i = 0; i < SLEEP_RETRIES; i++) {
|
||||
if (Resources.equals(resource, app.getCurrentConsumption())) {
|
||||
break;
|
||||
} else {
|
||||
Thread.sleep(SLEEP_DURATION);
|
||||
}
|
||||
}
|
||||
|
||||
// available resource
|
||||
Assert.assertEquals(resource.getMemory(),
|
||||
app.getCurrentConsumption().getMemory());
|
||||
Assert.assertEquals(resource.getVirtualCores(),
|
||||
app.getCurrentConsumption().getVirtualCores());
|
||||
}
|
||||
}
|
@ -21,9 +21,11 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
|
||||
@ -31,13 +33,17 @@
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
||||
private ControlledClock mockClock;
|
||||
@ -78,7 +84,7 @@ public void teardown() {
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testSchedulingDelay() throws InterruptedException {
|
||||
public void testBasic() throws InterruptedException {
|
||||
// Add one node
|
||||
String host = "127.0.0.1";
|
||||
RMNode node1 = MockNodes.newNodeInfo(
|
||||
@ -88,8 +94,6 @@ public void testSchedulingDelay() throws InterruptedException {
|
||||
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(nodeUpdateEvent);
|
||||
|
||||
// Create one application and submit one each of node-local, rack-local
|
||||
// and ANY requests
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
@ -102,11 +106,69 @@ public void testSchedulingDelay() throws InterruptedException {
|
||||
appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||
|
||||
// Advance time and let continuous scheduling kick in
|
||||
mockClock.tickSec(1);
|
||||
while (1024 != app.getCurrentConsumption().getMemorySize()) {
|
||||
Thread.sleep(100);
|
||||
triggerSchedulingAttempt();
|
||||
checkAppConsumption(app, Resources.createResource(1024, 1));
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testSortedNodes() throws Exception {
|
||||
// Add two nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
// available resource
|
||||
Assert.assertEquals(scheduler.getClusterResource().getMemorySize(), 16 * 1024);
|
||||
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
|
||||
|
||||
// send application request
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
|
||||
scheduler.addApplication(appAttemptId.getApplicationId(),
|
||||
"queue11", "user11", false);
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||
List<ResourceRequest> ask = new ArrayList<>();
|
||||
ResourceRequest request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||
ask.add(request);
|
||||
scheduler.allocate(appAttemptId, ask,
|
||||
new ArrayList<ContainerId>(), null, null, null, null);
|
||||
triggerSchedulingAttempt();
|
||||
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||
checkAppConsumption(app, Resources.createResource(1024, 1));
|
||||
|
||||
// another request
|
||||
request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
||||
ask.clear();
|
||||
ask.add(request);
|
||||
scheduler.allocate(appAttemptId, ask,
|
||||
new ArrayList<ContainerId>(), null, null, null, null);
|
||||
triggerSchedulingAttempt();
|
||||
|
||||
checkAppConsumption(app, Resources.createResource(2048,2));
|
||||
|
||||
// 2 containers should be assigned to 2 nodes
|
||||
Set<NodeId> nodes = new HashSet<NodeId>();
|
||||
Iterator<RMContainer> it = app.getLiveContainers().iterator();
|
||||
while (it.hasNext()) {
|
||||
nodes.add(it.next().getContainer().getNodeId());
|
||||
}
|
||||
assertEquals(1024, app.getCurrentConsumption().getMemorySize());
|
||||
Assert.assertEquals(2, nodes.size());
|
||||
}
|
||||
|
||||
private void triggerSchedulingAttempt() {
|
||||
mockClock.tickMsec(
|
||||
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
|
||||
}
|
||||
}
|
||||
|
@ -4637,81 +4637,6 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
|
||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testContinuousScheduling() throws Exception {
|
||||
// set continuous scheduling enabled
|
||||
scheduler = new FairScheduler();
|
||||
Configuration conf = createConfiguration();
|
||||
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
|
||||
true);
|
||||
scheduler.setRMContext(resourceManager.getRMContext());
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
Assert.assertTrue("Continuous scheduling should be enabled.",
|
||||
scheduler.isContinuousSchedulingEnabled());
|
||||
|
||||
// Add two nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
// available resource
|
||||
Assert.assertEquals(scheduler.getClusterResource().getMemorySize(), 16 * 1024);
|
||||
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
|
||||
|
||||
// send application request
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
|
||||
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||
ask.add(request);
|
||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
||||
|
||||
// waiting for continuous_scheduler_sleep_time
|
||||
// at least one pass
|
||||
Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500);
|
||||
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||
// Wait until app gets resources.
|
||||
while (app.getCurrentConsumption().equals(Resources.none())) { }
|
||||
|
||||
// check consumption
|
||||
Assert.assertEquals(1024, app.getCurrentConsumption().getMemorySize());
|
||||
Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
|
||||
|
||||
// another request
|
||||
request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
||||
ask.clear();
|
||||
ask.add(request);
|
||||
scheduler.stop();
|
||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
||||
scheduler.continuousSchedulingAttempt();
|
||||
Assert.assertEquals(2048, app.getCurrentConsumption().getMemorySize());
|
||||
Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores());
|
||||
|
||||
// 2 containers should be assigned to 2 nodes
|
||||
Set<NodeId> nodes = new HashSet<NodeId>();
|
||||
Iterator<RMContainer> it = app.getLiveContainers().iterator();
|
||||
while (it.hasNext()) {
|
||||
nodes.add(it.next().getContainer().getNodeId());
|
||||
}
|
||||
Assert.assertEquals(2, nodes.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
|
||||
// Disable continuous scheduling, will invoke continuous scheduling once manually
|
||||
|
Loading…
x
Reference in New Issue
Block a user