YARN-4812. TestFairScheduler#testContinuousScheduling fails intermittently. (kasha)
This commit is contained in:
parent
80fa70c4e1
commit
f84af8bd58
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import org.junit.Assert;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -50,7 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
public class FairSchedulerTestBase {
|
||||
public final static String TEST_DIR =
|
||||
|
@ -66,6 +67,8 @@ public class FairSchedulerTestBase {
|
|||
protected FairScheduler scheduler;
|
||||
protected ResourceManager resourceManager;
|
||||
public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
|
||||
private static final int SLEEP_DURATION = 10;
|
||||
private static final int SLEEP_RETRIES = 1000;
|
||||
|
||||
// Helper methods
|
||||
public Configuration createConfiguration() {
|
||||
|
@ -260,4 +263,21 @@ public class FairSchedulerTestBase {
|
|||
.put(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
}
|
||||
|
||||
protected void checkAppConsumption(FSAppAttempt app, Resource resource)
|
||||
throws InterruptedException {
|
||||
for (int i = 0; i < SLEEP_RETRIES; i++) {
|
||||
if (Resources.equals(resource, app.getCurrentConsumption())) {
|
||||
break;
|
||||
} else {
|
||||
Thread.sleep(SLEEP_DURATION);
|
||||
}
|
||||
}
|
||||
|
||||
// available resource
|
||||
Assert.assertEquals(resource.getMemory(),
|
||||
app.getCurrentConsumption().getMemory());
|
||||
Assert.assertEquals(resource.getVirtualCores(),
|
||||
app.getCurrentConsumption().getVirtualCores());
|
||||
}
|
||||
}
|
|
@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
|
||||
|
@ -31,13 +33,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
|
|||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
||||
private ControlledClock mockClock;
|
||||
|
@ -78,7 +84,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
|
|||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testSchedulingDelay() throws InterruptedException {
|
||||
public void testBasic() throws InterruptedException {
|
||||
// Add one node
|
||||
String host = "127.0.0.1";
|
||||
RMNode node1 = MockNodes.newNodeInfo(
|
||||
|
@ -88,8 +94,6 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
|
|||
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(nodeUpdateEvent);
|
||||
|
||||
// Create one application and submit one each of node-local, rack-local
|
||||
// and ANY requests
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
|
@ -102,11 +106,69 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
|
|||
appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||
|
||||
// Advance time and let continuous scheduling kick in
|
||||
mockClock.tickSec(1);
|
||||
while (1024 != app.getCurrentConsumption().getMemory()) {
|
||||
Thread.sleep(100);
|
||||
triggerSchedulingAttempt();
|
||||
checkAppConsumption(app, Resources.createResource(1024, 1));
|
||||
}
|
||||
assertEquals(1024, app.getCurrentConsumption().getMemory());
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testSortedNodes() throws Exception {
|
||||
// Add two nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
// available resource
|
||||
Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024);
|
||||
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
|
||||
|
||||
// send application request
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
|
||||
scheduler.addApplication(appAttemptId.getApplicationId(),
|
||||
"queue11", "user11", false);
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||
List<ResourceRequest> ask = new ArrayList<>();
|
||||
ResourceRequest request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||
ask.add(request);
|
||||
scheduler.allocate(appAttemptId, ask,
|
||||
new ArrayList<ContainerId>(), null, null, null, null);
|
||||
triggerSchedulingAttempt();
|
||||
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||
checkAppConsumption(app, Resources.createResource(1024, 1));
|
||||
|
||||
// another request
|
||||
request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
||||
ask.clear();
|
||||
ask.add(request);
|
||||
scheduler.allocate(appAttemptId, ask,
|
||||
new ArrayList<ContainerId>(), null, null, null, null);
|
||||
triggerSchedulingAttempt();
|
||||
|
||||
checkAppConsumption(app, Resources.createResource(2048,2));
|
||||
|
||||
// 2 containers should be assigned to 2 nodes
|
||||
Set<NodeId> nodes = new HashSet<NodeId>();
|
||||
Iterator<RMContainer> it = app.getLiveContainers().iterator();
|
||||
while (it.hasNext()) {
|
||||
nodes.add(it.next().getContainer().getNodeId());
|
||||
}
|
||||
Assert.assertEquals(2, nodes.size());
|
||||
}
|
||||
|
||||
private void triggerSchedulingAttempt() {
|
||||
mockClock.tickMsec(
|
||||
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3738,81 +3738,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testContinuousScheduling() throws Exception {
|
||||
// set continuous scheduling enabled
|
||||
scheduler = new FairScheduler();
|
||||
Configuration conf = createConfiguration();
|
||||
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
|
||||
true);
|
||||
scheduler.setRMContext(resourceManager.getRMContext());
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
Assert.assertTrue("Continuous scheduling should be enabled.",
|
||||
scheduler.isContinuousSchedulingEnabled());
|
||||
|
||||
// Add two nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
// available resource
|
||||
Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024);
|
||||
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
|
||||
|
||||
// send application request
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
|
||||
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||
ask.add(request);
|
||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
||||
|
||||
// waiting for continuous_scheduler_sleep_time
|
||||
// at least one pass
|
||||
Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500);
|
||||
|
||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||
// Wait until app gets resources.
|
||||
while (app.getCurrentConsumption().equals(Resources.none())) { }
|
||||
|
||||
// check consumption
|
||||
Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
|
||||
Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
|
||||
|
||||
// another request
|
||||
request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
||||
ask.clear();
|
||||
ask.add(request);
|
||||
scheduler.stop();
|
||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
||||
scheduler.continuousSchedulingAttempt();
|
||||
Assert.assertEquals(2048, app.getCurrentConsumption().getMemory());
|
||||
Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores());
|
||||
|
||||
// 2 containers should be assigned to 2 nodes
|
||||
Set<NodeId> nodes = new HashSet<NodeId>();
|
||||
Iterator<RMContainer> it = app.getLiveContainers().iterator();
|
||||
while (it.hasNext()) {
|
||||
nodes.add(it.next().getContainer().getNodeId());
|
||||
}
|
||||
Assert.assertEquals(2, nodes.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
|
||||
// Disable continuous scheduling, will invoke continuous scheduling once manually
|
||||
|
|
Loading…
Reference in New Issue