YARN-4812. TestFairScheduler#testContinuousScheduling fails intermittently. (kasha)
(cherry picked from commit f84af8bd588763c4e99305742d8c86ed596e8359)
This commit is contained in:
parent
365d236175
commit
bbe9bb078c
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ -50,7 +51,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
public class FairSchedulerTestBase {
|
public class FairSchedulerTestBase {
|
||||||
public final static String TEST_DIR =
|
public final static String TEST_DIR =
|
||||||
@ -66,6 +67,8 @@ public class FairSchedulerTestBase {
|
|||||||
protected FairScheduler scheduler;
|
protected FairScheduler scheduler;
|
||||||
protected ResourceManager resourceManager;
|
protected ResourceManager resourceManager;
|
||||||
public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
|
public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
|
||||||
|
private static final int SLEEP_DURATION = 10;
|
||||||
|
private static final int SLEEP_RETRIES = 1000;
|
||||||
|
|
||||||
// Helper methods
|
// Helper methods
|
||||||
public Configuration createConfiguration() {
|
public Configuration createConfiguration() {
|
||||||
@ -260,4 +263,21 @@ protected RMApp createMockRMApp(ApplicationAttemptId attemptId) {
|
|||||||
.put(attemptId.getApplicationId(), app);
|
.put(attemptId.getApplicationId(), app);
|
||||||
return app;
|
return app;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void checkAppConsumption(FSAppAttempt app, Resource resource)
|
||||||
|
throws InterruptedException {
|
||||||
|
for (int i = 0; i < SLEEP_RETRIES; i++) {
|
||||||
|
if (Resources.equals(resource, app.getCurrentConsumption())) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(SLEEP_DURATION);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// available resource
|
||||||
|
Assert.assertEquals(resource.getMemory(),
|
||||||
|
app.getCurrentConsumption().getMemory());
|
||||||
|
Assert.assertEquals(resource.getVirtualCores(),
|
||||||
|
app.getCurrentConsumption().getVirtualCores());
|
||||||
|
}
|
||||||
}
|
}
|
@ -21,9 +21,11 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
|
||||||
@ -31,13 +33,17 @@
|
|||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
||||||
private ControlledClock mockClock;
|
private ControlledClock mockClock;
|
||||||
@ -78,7 +84,7 @@ public void teardown() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testSchedulingDelay() throws InterruptedException {
|
public void testBasic() throws InterruptedException {
|
||||||
// Add one node
|
// Add one node
|
||||||
String host = "127.0.0.1";
|
String host = "127.0.0.1";
|
||||||
RMNode node1 = MockNodes.newNodeInfo(
|
RMNode node1 = MockNodes.newNodeInfo(
|
||||||
@ -88,8 +94,6 @@ public void testSchedulingDelay() throws InterruptedException {
|
|||||||
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
|
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||||
scheduler.handle(nodeUpdateEvent);
|
scheduler.handle(nodeUpdateEvent);
|
||||||
|
|
||||||
// Create one application and submit one each of node-local, rack-local
|
|
||||||
// and ANY requests
|
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
createMockRMApp(appAttemptId);
|
createMockRMApp(appAttemptId);
|
||||||
@ -102,11 +106,69 @@ public void testSchedulingDelay() throws InterruptedException {
|
|||||||
appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
||||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||||
|
|
||||||
// Advance time and let continuous scheduling kick in
|
triggerSchedulingAttempt();
|
||||||
mockClock.tickSec(1);
|
checkAppConsumption(app, Resources.createResource(1024, 1));
|
||||||
while (1024 != app.getCurrentConsumption().getMemory()) {
|
}
|
||||||
Thread.sleep(100);
|
|
||||||
|
@Test (timeout = 10000)
|
||||||
|
public void testSortedNodes() throws Exception {
|
||||||
|
// Add two nodes
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||||
|
"127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
RMNode node2 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||||
|
"127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
|
// available resource
|
||||||
|
Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024);
|
||||||
|
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
|
||||||
|
|
||||||
|
// send application request
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
|
createMockRMApp(appAttemptId);
|
||||||
|
|
||||||
|
scheduler.addApplication(appAttemptId.getApplicationId(),
|
||||||
|
"queue11", "user11", false);
|
||||||
|
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||||
|
List<ResourceRequest> ask = new ArrayList<>();
|
||||||
|
ResourceRequest request =
|
||||||
|
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||||
|
ask.add(request);
|
||||||
|
scheduler.allocate(appAttemptId, ask,
|
||||||
|
new ArrayList<ContainerId>(), null, null, null, null);
|
||||||
|
triggerSchedulingAttempt();
|
||||||
|
|
||||||
|
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
||||||
|
checkAppConsumption(app, Resources.createResource(1024, 1));
|
||||||
|
|
||||||
|
// another request
|
||||||
|
request =
|
||||||
|
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
||||||
|
ask.clear();
|
||||||
|
ask.add(request);
|
||||||
|
scheduler.allocate(appAttemptId, ask,
|
||||||
|
new ArrayList<ContainerId>(), null, null, null, null);
|
||||||
|
triggerSchedulingAttempt();
|
||||||
|
|
||||||
|
checkAppConsumption(app, Resources.createResource(2048,2));
|
||||||
|
|
||||||
|
// 2 containers should be assigned to 2 nodes
|
||||||
|
Set<NodeId> nodes = new HashSet<NodeId>();
|
||||||
|
Iterator<RMContainer> it = app.getLiveContainers().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
nodes.add(it.next().getContainer().getNodeId());
|
||||||
}
|
}
|
||||||
assertEquals(1024, app.getCurrentConsumption().getMemory());
|
Assert.assertEquals(2, nodes.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void triggerSchedulingAttempt() {
|
||||||
|
mockClock.tickMsec(
|
||||||
|
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3738,81 +3738,6 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
|
|||||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 10000)
|
|
||||||
public void testContinuousScheduling() throws Exception {
|
|
||||||
// set continuous scheduling enabled
|
|
||||||
scheduler = new FairScheduler();
|
|
||||||
Configuration conf = createConfiguration();
|
|
||||||
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
|
|
||||||
true);
|
|
||||||
scheduler.setRMContext(resourceManager.getRMContext());
|
|
||||||
scheduler.init(conf);
|
|
||||||
scheduler.start();
|
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
||||||
Assert.assertTrue("Continuous scheduling should be enabled.",
|
|
||||||
scheduler.isContinuousSchedulingEnabled());
|
|
||||||
|
|
||||||
// Add two nodes
|
|
||||||
RMNode node1 =
|
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
|
||||||
"127.0.0.1");
|
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
||||||
scheduler.handle(nodeEvent1);
|
|
||||||
RMNode node2 =
|
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
|
||||||
"127.0.0.2");
|
|
||||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
||||||
scheduler.handle(nodeEvent2);
|
|
||||||
|
|
||||||
// available resource
|
|
||||||
Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024);
|
|
||||||
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
|
|
||||||
|
|
||||||
// send application request
|
|
||||||
ApplicationAttemptId appAttemptId =
|
|
||||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
|
||||||
createMockRMApp(appAttemptId);
|
|
||||||
|
|
||||||
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
|
|
||||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
|
||||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
|
||||||
ResourceRequest request =
|
|
||||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
|
||||||
ask.add(request);
|
|
||||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
|
||||||
|
|
||||||
// waiting for continuous_scheduler_sleep_time
|
|
||||||
// at least one pass
|
|
||||||
Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500);
|
|
||||||
|
|
||||||
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
|
||||||
// Wait until app gets resources.
|
|
||||||
while (app.getCurrentConsumption().equals(Resources.none())) { }
|
|
||||||
|
|
||||||
// check consumption
|
|
||||||
Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
|
|
||||||
Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
|
|
||||||
|
|
||||||
// another request
|
|
||||||
request =
|
|
||||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
|
|
||||||
ask.clear();
|
|
||||||
ask.add(request);
|
|
||||||
scheduler.stop();
|
|
||||||
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
|
|
||||||
scheduler.continuousSchedulingAttempt();
|
|
||||||
Assert.assertEquals(2048, app.getCurrentConsumption().getMemory());
|
|
||||||
Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores());
|
|
||||||
|
|
||||||
// 2 containers should be assigned to 2 nodes
|
|
||||||
Set<NodeId> nodes = new HashSet<NodeId>();
|
|
||||||
Iterator<RMContainer> it = app.getLiveContainers().iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
nodes.add(it.next().getContainer().getNodeId());
|
|
||||||
}
|
|
||||||
Assert.assertEquals(2, nodes.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
|
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
|
||||||
// Disable continuous scheduling, will invoke continuous scheduling once manually
|
// Disable continuous scheduling, will invoke continuous scheduling once manually
|
||||||
|
Loading…
x
Reference in New Issue
Block a user