YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling from node-heartbeats. Contributed by Arun C Murthy.
svn merge --ignore-ancestry -c 1578722 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1578724 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f8d157cda8
commit
0c6db6af6e
|
@ -297,6 +297,9 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1824. Improved NodeManager and clients to be able to handle cross
|
||||
platform application submissions. (Jian He via vinodkv)
|
||||
|
||||
YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling from
|
||||
node-heartbeats. (Arun C Murthy via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -172,6 +172,12 @@
|
|||
</Or>
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!-- Inconsistent sync warning - scheduleAsynchronously is only initialized once and never changed -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler" />
|
||||
<Field name="scheduleAsynchronously" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!-- Inconsistent sync warning - minimumAllocation is only initialized once and never changed -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler" />
|
||||
|
|
|
@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -194,6 +197,18 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
private ResourceCalculator calculator;
|
||||
private boolean usePortForNodeName;
|
||||
|
||||
private boolean scheduleAsynchronously;
|
||||
private AsyncScheduleThread asyncSchedulerThread;
|
||||
|
||||
/**
|
||||
* EXPERT
|
||||
*/
|
||||
private long asyncScheduleInterval;
|
||||
private static final String ASYNC_SCHEDULER_INTERVAL =
|
||||
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
|
||||
+ ".scheduling-interval-ms";
|
||||
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
|
||||
|
||||
public CapacityScheduler() {}
|
||||
|
||||
@Override
|
||||
|
@ -272,11 +287,23 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
|
||||
initializeQueues(this.conf);
|
||||
|
||||
scheduleAsynchronously = this.conf.getScheduleAynschronously();
|
||||
asyncScheduleInterval =
|
||||
this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
|
||||
DEFAULT_ASYNC_SCHEDULER_INTERVAL);
|
||||
if (scheduleAsynchronously) {
|
||||
asyncSchedulerThread = new AsyncScheduleThread(this);
|
||||
asyncSchedulerThread.start();
|
||||
}
|
||||
|
||||
initialized = true;
|
||||
LOG.info("Initialized CapacityScheduler with " +
|
||||
"calculator=" + getResourceCalculator().getClass() + ", " +
|
||||
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
|
||||
"maximumAllocation=<" + getMaximumResourceCapability() + ">");
|
||||
"maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
|
||||
"asynchronousScheduling=" + scheduleAsynchronously + ", " +
|
||||
"asyncScheduleInterval=" + asyncScheduleInterval + "ms");
|
||||
|
||||
} else {
|
||||
CapacitySchedulerConfiguration oldConf = this.conf;
|
||||
this.conf = loadCapacitySchedulerConfiguration(configuration);
|
||||
|
@ -290,7 +317,69 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
long getAsyncScheduleInterval() {
|
||||
return asyncScheduleInterval;
|
||||
}
|
||||
|
||||
private final static Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
/**
|
||||
* Schedule on all nodes by starting at a random point.
|
||||
* @param cs
|
||||
*/
|
||||
static void schedule(CapacityScheduler cs) {
|
||||
// First randomize the start point
|
||||
int current = 0;
|
||||
Collection<FiCaSchedulerNode> nodes = cs.getAllNodes().values();
|
||||
int start = random.nextInt(nodes.size());
|
||||
for (FiCaSchedulerNode node : nodes) {
|
||||
if (current++ >= start) {
|
||||
cs.allocateContainersToNode(node);
|
||||
}
|
||||
}
|
||||
// Now, just get everyone to be safe
|
||||
for (FiCaSchedulerNode node : nodes) {
|
||||
cs.allocateContainersToNode(node);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(cs.getAsyncScheduleInterval());
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
|
||||
static class AsyncScheduleThread extends Thread {
|
||||
|
||||
private final CapacityScheduler cs;
|
||||
private AtomicBoolean runSchedules = new AtomicBoolean(false);
|
||||
|
||||
public AsyncScheduleThread(CapacityScheduler cs) {
|
||||
this.cs = cs;
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
if (!runSchedules.get()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {}
|
||||
} else {
|
||||
schedule(cs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void beginSchedule() {
|
||||
runSchedules.set(true);
|
||||
}
|
||||
|
||||
public void suspendSchedule() {
|
||||
runSchedules.set(false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Private
|
||||
public static final String ROOT_QUEUE =
|
||||
CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT;
|
||||
|
@ -696,6 +785,9 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
LOG.debug("Node being looked for scheduling " + nm
|
||||
+ " availableResource: " + node.getAvailableResource());
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||
|
||||
// Assign new containers...
|
||||
// 1. Check for reserved applications
|
||||
|
@ -708,7 +800,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
|
||||
// Try to fulfill the reservation
|
||||
LOG.info("Trying to fulfill reservation for application " +
|
||||
reservedApplication.getApplicationId() + " on node: " + nm);
|
||||
reservedApplication.getApplicationId() + " on node: " +
|
||||
node.getNodeID());
|
||||
|
||||
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
|
||||
CSAssignment assignment = queue.assignContainers(clusterResource, node);
|
||||
|
@ -729,9 +822,16 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
|
||||
// Try to schedule more if there are no reservations to fulfill
|
||||
if (node.getReservedContainer() == null) {
|
||||
root.assignContainers(clusterResource, node);
|
||||
if (Resources.greaterThanOrEqual(calculator, getClusterResources(),
|
||||
node.getAvailableResource(), minimumAllocation)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
|
||||
", available: " + node.getAvailableResource());
|
||||
}
|
||||
root.assignContainers(clusterResource, node);
|
||||
}
|
||||
} else {
|
||||
LOG.info("Skipping scheduling since node " + nm +
|
||||
LOG.info("Skipping scheduling since node " + node.getNodeID() +
|
||||
" is reserved by application " +
|
||||
node.getReservedContainer().getContainerId().getApplicationAttemptId()
|
||||
);
|
||||
|
@ -772,7 +872,11 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
case NODE_UPDATE:
|
||||
{
|
||||
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
||||
nodeUpdate(nodeUpdatedEvent.getRMNode());
|
||||
RMNode node = nodeUpdatedEvent.getRMNode();
|
||||
nodeUpdate(node);
|
||||
if (!scheduleAsynchronously) {
|
||||
allocateContainersToNode(getNode(node.getNodeID()));
|
||||
}
|
||||
}
|
||||
break;
|
||||
case APP_ADDED:
|
||||
|
@ -831,6 +935,10 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
++numNodeManagers;
|
||||
LOG.info("Added node " + nodeManager.getNodeAddress() +
|
||||
" clusterResource: " + clusterResource);
|
||||
|
||||
if (scheduleAsynchronously && numNodeManagers == 1) {
|
||||
asyncSchedulerThread.beginSchedule();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void removeNode(RMNode nodeInfo) {
|
||||
|
@ -842,6 +950,10 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
root.updateClusterResource(clusterResource);
|
||||
--numNodeManagers;
|
||||
|
||||
if (scheduleAsynchronously && numNodeManagers == 0) {
|
||||
asyncSchedulerThread.suspendSchedule();
|
||||
}
|
||||
|
||||
// Remove running containers
|
||||
List<RMContainer> runningContainers = node.getRunningContainers();
|
||||
for (RMContainer container : runningContainers) {
|
||||
|
@ -931,7 +1043,12 @@ public class CapacityScheduler extends AbstractYarnScheduler
|
|||
FiCaSchedulerNode getNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
}
|
||||
|
||||
|
||||
@Lock(Lock.NoLock.class)
|
||||
Map<NodeId, FiCaSchedulerNode> getAllNodes() {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMContainer getRMContainer(ContainerId containerId) {
|
||||
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
|
||||
|
|
|
@ -135,6 +135,17 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
|||
@Private
|
||||
public static final int DEFAULT_NODE_LOCALITY_DELAY = -1;
|
||||
|
||||
@Private
|
||||
public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
|
||||
PREFIX + "schedule-asynchronously";
|
||||
|
||||
@Private
|
||||
public static final String SCHEDULE_ASYNCHRONOUSLY_ENABLE =
|
||||
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable";
|
||||
|
||||
@Private
|
||||
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
|
||||
|
||||
public CapacitySchedulerConfiguration() {
|
||||
this(new Configuration());
|
||||
}
|
||||
|
@ -357,4 +368,14 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
|||
resourceCalculatorClass,
|
||||
ResourceCalculator.class);
|
||||
}
|
||||
|
||||
public boolean getScheduleAynschronously() {
|
||||
return getBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE,
|
||||
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE);
|
||||
}
|
||||
|
||||
public void setScheduleAynschronously(boolean async) {
|
||||
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,20 +19,15 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
|
@ -61,11 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
|
@ -650,4 +642,30 @@ public class TestCapacityScheduler {
|
|||
cs.getSchedulerApplications(), cs, "a1");
|
||||
Assert.assertEquals("a1", app.getQueue().getQueueName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncScheduling() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
final int NODES = 100;
|
||||
|
||||
// Register nodes
|
||||
for (int i=0; i < NODES; ++i) {
|
||||
String host = "192.168.1." + i;
|
||||
RMNode node =
|
||||
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
|
||||
cs.handle(new NodeAddedSchedulerEvent(node));
|
||||
}
|
||||
|
||||
// Now directly exercise the scheduling loop
|
||||
for (int i=0; i < NODES; ++i) {
|
||||
CapacityScheduler.schedule(cs);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue