YARN-4161. Capacity Scheduler : Assign single or multiple containers per heart beat driven by configuration. (Wei Yan via wangda)

Change-Id: Ic441ae4e0bf72e7232411eb54243ec143d5fd0d3
This commit is contained in:
Wangda Tan 2017-08-07 11:32:12 -07:00
parent a3a9c976c3
commit adb84f34db
3 changed files with 289 additions and 19 deletions

View File

@ -94,11 +94,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidExcep
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@ -163,6 +161,9 @@ public class CapacityScheduler extends
private int offswitchPerHeartbeatLimit; private int offswitchPerHeartbeatLimit;
private boolean assignMultipleEnabled;
private int maxAssignPerHeartbeat;
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
@ -308,6 +309,9 @@ public class CapacityScheduler extends
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL); DEFAULT_ASYNC_SCHEDULER_INTERVAL);
this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
// number of threads for async scheduling // number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt( int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
@ -1109,17 +1113,29 @@ public class CapacityScheduler extends
.getAssignmentInformation().getReserved()); .getAssignmentInformation().getReserved());
} }
private boolean canAllocateMore(CSAssignment assignment, int offswitchCount) { private boolean canAllocateMore(CSAssignment assignment, int offswitchCount,
if (null != assignment && Resources.greaterThan(getResourceCalculator(), int assignedContainers) {
getClusterResource(), assignment.getResource(), Resources.none()) // Current assignment shouldn't be empty
&& offswitchCount < offswitchPerHeartbeatLimit) { if (assignment == null
// And it should not be a reserved container || Resources.equals(assignment.getResource(), Resources.none())) {
if (assignment.getAssignmentInformation().getNumReservations() == 0) { return false;
return true;
}
} }
return false; // offswitch assignment should be under threshold
if (offswitchCount >= offswitchPerHeartbeatLimit) {
return false;
}
// And it should not be a reserved container
if (assignment.getAssignmentInformation().getNumReservations() > 0) {
return false;
}
// assignMultipleEnabled should be ON,
// and assignedContainers should be under threshold
return assignMultipleEnabled
&& (maxAssignPerHeartbeat == -1
|| assignedContainers < maxAssignPerHeartbeat);
} }
/** /**
@ -1131,6 +1147,7 @@ public class CapacityScheduler extends
FiCaSchedulerNode node = getNode(nodeId); FiCaSchedulerNode node = getNode(nodeId);
if (null != node) { if (null != node) {
int offswitchCount = 0; int offswitchCount = 0;
int assignedContainers = 0;
PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node); PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node);
CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat); CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat);
@ -1141,7 +1158,13 @@ public class CapacityScheduler extends
offswitchCount++; offswitchCount++;
} }
while (canAllocateMore(assignment, offswitchCount)) { if (Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) {
assignedContainers++;
}
while (canAllocateMore(assignment, offswitchCount,
assignedContainers)) {
// Try to see if it is possible to allocate multiple container for // Try to see if it is possible to allocate multiple container for
// the same node heartbeat // the same node heartbeat
assignment = allocateContainersToNode(ps, true); assignment = allocateContainersToNode(ps, true);
@ -1150,6 +1173,12 @@ public class CapacityScheduler extends
&& assignment.getType() == NodeType.OFF_SWITCH) { && assignment.getType() == NodeType.OFF_SWITCH) {
offswitchCount++; offswitchCount++;
} }
if (null != assignment
&& Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) {
assignedContainers++;
}
} }
if (offswitchCount >= offswitchPerHeartbeatLimit) { if (offswitchCount >= offswitchPerHeartbeatLimit) {

View File

@ -301,6 +301,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private @Private
public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
@Private
public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX
+ "per-node-heartbeat.multiple-assignments-enabled";
@Private
public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true;
/** Maximum number of containers to assign on each check-in. */
@Private
public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX
+ "per-node-heartbeat.maximum-container-assignments";
@Private
public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
public CapacitySchedulerConfiguration() { public CapacitySchedulerConfiguration() {
@ -1473,4 +1488,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
} }
return userWeights; return userWeights;
} }
public boolean getAssignMultipleEnabled() {
return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED);
}
public int getMaxAssignPerHeartbeat() {
return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
}
} }

View File

@ -233,6 +233,17 @@ public class TestCapacityScheduler {
} }
} }
private NodeManager registerNode(ResourceManager rm, String hostName,
int containerManagerPort, int httpPort, String rackName,
Resource capability) throws IOException, YarnException {
NodeManager nm = new NodeManager(hostName,
containerManagerPort, httpPort, rackName, capability, rm);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
.get(nm.getNodeId()));
rm.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}
@Test (timeout = 30000) @Test (timeout = 30000)
public void testConfValidation() throws Exception { public void testConfValidation() throws Exception {
@ -267,12 +278,12 @@ public class TestCapacityScheduler {
} }
} }
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager private NodeManager
registerNode(String hostName, int containerManagerPort, int httpPort, registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability) String rackName, Resource capability)
throws IOException, YarnException { throws IOException, YarnException {
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = NodeManager nm =
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( new NodeManager(
hostName, containerManagerPort, httpPort, rackName, capability, hostName, containerManagerPort, httpPort, rackName, capability,
resourceManager); resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
@ -400,8 +411,216 @@ public class TestCapacityScheduler {
LOG.info("--- END: testCapacityScheduler ---"); LOG.info("--- END: testCapacityScheduler ---");
} }
private void nodeUpdate( @Test
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) { public void testNotAssignMultiple() throws Exception {
LOG.info("--- START: testNotAssignMultiple ---");
ResourceManager rm = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setBoolean(
CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false);
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
rm.init(conf);
rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
RMContext mC = mock(RMContext.class);
when(mC.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider());
// Register node1
String host0 = "host_0";
NodeManager nm0 =
registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(10 * GB, 10));
// ResourceRequest priorities
Priority priority0 = Priority.newInstance(0);
Priority priority1 = Priority.newInstance(1);
// Submit an application
Application application0 = new Application("user_0", "a1", rm);
application0.submit();
application0.addNodeManager(host0, 1234, nm0);
Resource capability00 = Resources.createResource(1 * GB, 1);
application0.addResourceRequestSpec(priority0, capability00);
Resource capability01 = Resources.createResource(2 * GB, 1);
application0.addResourceRequestSpec(priority1, capability01);
Task task00 =
new Task(application0, priority0, new String[] {host0});
Task task01 =
new Task(application0, priority1, new String[] {host0});
application0.addTask(task00);
application0.addTask(task01);
// Submit another application
Application application1 = new Application("user_1", "b2", rm);
application1.submit();
application1.addNodeManager(host0, 1234, nm0);
Resource capability10 = Resources.createResource(3 * GB, 1);
application1.addResourceRequestSpec(priority0, capability10);
Resource capability11 = Resources.createResource(4 * GB, 1);
application1.addResourceRequestSpec(priority1, capability11);
Task task10 = new Task(application1, priority0, new String[] {host0});
Task task11 = new Task(application1, priority1, new String[] {host0});
application1.addTask(task10);
application1.addTask(task11);
// Send resource requests to the scheduler
application0.schedule();
application1.schedule();
// Send a heartbeat to kick the tires on the Scheduler
LOG.info("Kick!");
// task00, used=1G
nodeUpdate(rm, nm0);
// Get allocations from the scheduler
application0.schedule();
application1.schedule();
// 1 Task per heart beat should be scheduled
checkNodeResourceUsage(3 * GB, nm0); // task00 (1G)
checkApplicationResourceUsage(0 * GB, application0);
checkApplicationResourceUsage(3 * GB, application1);
// Another heartbeat
nodeUpdate(rm, nm0);
application0.schedule();
checkApplicationResourceUsage(1 * GB, application0);
application1.schedule();
checkApplicationResourceUsage(3 * GB, application1);
checkNodeResourceUsage(4 * GB, nm0);
LOG.info("--- START: testNotAssignMultiple ---");
}
@Test
public void testAssignMultiple() throws Exception {
LOG.info("--- START: testAssignMultiple ---");
ResourceManager rm = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setBoolean(
CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true);
// Each heartbeat will assign 2 containers at most
csConf.setInt(CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, 2);
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
rm.init(conf);
rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
RMContext mC = mock(RMContext.class);
when(mC.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider());
// Register node1
String host0 = "host_0";
NodeManager nm0 =
registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(10 * GB, 10));
// ResourceRequest priorities
Priority priority0 = Priority.newInstance(0);
Priority priority1 = Priority.newInstance(1);
// Submit an application
Application application0 = new Application("user_0", "a1", rm);
application0.submit();
application0.addNodeManager(host0, 1234, nm0);
Resource capability00 = Resources.createResource(1 * GB, 1);
application0.addResourceRequestSpec(priority0, capability00);
Resource capability01 = Resources.createResource(2 * GB, 1);
application0.addResourceRequestSpec(priority1, capability01);
Task task00 = new Task(application0, priority0, new String[] {host0});
Task task01 = new Task(application0, priority1, new String[] {host0});
application0.addTask(task00);
application0.addTask(task01);
// Submit another application
Application application1 = new Application("user_1", "b2", rm);
application1.submit();
application1.addNodeManager(host0, 1234, nm0);
Resource capability10 = Resources.createResource(3 * GB, 1);
application1.addResourceRequestSpec(priority0, capability10);
Resource capability11 = Resources.createResource(4 * GB, 1);
application1.addResourceRequestSpec(priority1, capability11);
Task task10 =
new Task(application1, priority0, new String[] {host0});
Task task11 =
new Task(application1, priority1, new String[] {host0});
application1.addTask(task10);
application1.addTask(task11);
// Send resource requests to the scheduler
application0.schedule();
application1.schedule();
// Send a heartbeat to kick the tires on the Scheduler
LOG.info("Kick!");
// task_0_0, used=1G
nodeUpdate(rm, nm0);
// Get allocations from the scheduler
application0.schedule();
application1.schedule();
// 1 Task per heart beat should be scheduled
checkNodeResourceUsage(4 * GB, nm0); // task00 (1G)
checkApplicationResourceUsage(1 * GB, application0);
checkApplicationResourceUsage(3 * GB, application1);
// Another heartbeat
nodeUpdate(rm, nm0);
application0.schedule();
checkApplicationResourceUsage(3 * GB, application0);
application1.schedule();
checkApplicationResourceUsage(7 * GB, application1);
checkNodeResourceUsage(10 * GB, nm0);
LOG.info("--- START: testAssignMultiple ---");
}
private void nodeUpdate(ResourceManager rm, NodeManager nm) {
RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
rm.getResourceScheduler().handle(nodeUpdate);
}
private void nodeUpdate(NodeManager nm) {
RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
@ -699,8 +918,7 @@ public class TestCapacityScheduler {
Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
} }
private void checkNodeResourceUsage(int expected, private void checkNodeResourceUsage(int expected, NodeManager node) {
org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) {
Assert.assertEquals(expected, node.getUsed().getMemorySize()); Assert.assertEquals(expected, node.getUsed().getMemorySize());
node.checkResourceUsage(); node.checkResourceUsage();
} }