YARN-2017. Merged some of the common scheduler code. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1596753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-05-22 05:32:26 +00:00
parent 6b2e615f5f
commit 82f3454f5a
28 changed files with 538 additions and 782 deletions

View File

@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -872,4 +874,11 @@ public class ResourceSchedulerWrapper implements
throws YarnException { throws YarnException {
return scheduler.moveApplication(appId, newQueue); return scheduler.moveApplication(appId, newQueue);
} }
@Override
@LimitedPrivate("yarn")
@Unstable
public Resource getClusterResource() {
return null;
}
} }

View File

@ -88,6 +88,8 @@ Release 2.5.0 - UNRELEASED
YARN-1938. Added kerberos login for the Timeline Server. (Zhijie Shen via YARN-1938. Added kerberos login for the Timeline Server. (Zhijie Shen via
vinodkv) vinodkv)
YARN-2017. Merged some of the common scheduler code. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -142,6 +142,17 @@
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService" /> <Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService" />
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<!-- Inconsistent sync warning - minimumAllocation is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler" />
<Field name="minimumAllocation" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode" />
<Method name="reserveResource" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- Inconsistent sync warning - reinitialize read from other queue does not need sync--> <!-- Inconsistent sync warning - reinitialize read from other queue does not need sync-->
<Match> <Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue" /> <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue" />
@ -178,12 +189,6 @@
<Field name="scheduleAsynchronously" /> <Field name="scheduleAsynchronously" />
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<!-- Inconsistent sync warning - minimumAllocation is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler" />
<Field name="minimumAllocation" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Inconsistent sync warning - numRetries is only initialized once and never changed --> <!-- Inconsistent sync warning - numRetries is only initialized once and never changed -->
<Match> <Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" /> <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />

View File

@ -170,7 +170,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public void editSchedule(){ public void editSchedule(){
CSQueue root = scheduler.getRootQueue(); CSQueue root = scheduler.getRootQueue();
Resource clusterResources = Resource clusterResources =
Resources.clone(scheduler.getClusterResources()); Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources); containerBasedPreemptOrKill(root, clusterResources);
} }

View File

@ -22,21 +22,41 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
public abstract class AbstractYarnScheduler implements ResourceScheduler { public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
// Nodes in the cluster, indexed by NodeId
protected Map<NodeId, N> nodes =
new ConcurrentHashMap<NodeId, N>();
// Whole capacity of the cluster
protected Resource clusterResource = Resource.newInstance(0, 0);
protected Resource minimumAllocation;
protected Resource maximumAllocation;
protected RMContext rmContext; protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication> applications; protected Map<ApplicationId, SchedulerApplication<T>> applications;
protected final static List<Container> EMPTY_CONTAINER_LIST = protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>(); new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation( protected static final Allocation EMPTY_ALLOCATION = new Allocation(
@ -45,7 +65,7 @@ public abstract class AbstractYarnScheduler implements ResourceScheduler {
public synchronized List<Container> getTransferredContainers( public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) { ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId(); ApplicationId appId = currentAttempt.getApplicationId();
SchedulerApplication app = applications.get(appId); SchedulerApplication<T> app = applications.get(appId);
List<Container> containerList = new ArrayList<Container>(); List<Container> containerList = new ArrayList<Container>();
RMApp appImpl = this.rmContext.getRMApps().get(appId); RMApp appImpl = this.rmContext.getRMApps().get(appId);
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
@ -64,10 +84,75 @@ public abstract class AbstractYarnScheduler implements ResourceScheduler {
return containerList; return containerList;
} }
public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() { public Map<ApplicationId, SchedulerApplication<T>>
getSchedulerApplications() {
return applications; return applications;
} }
@Override
public Resource getClusterResource() {
return clusterResource;
}
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
}
@Override
public Resource getMaximumResourceCapability() {
return maximumAllocation;
}
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication<T> app =
applications.get(applicationAttemptId.getApplicationId());
return app == null ? null : app.getCurrentAppAttempt();
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
return new SchedulerAppReport(attempt);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
return attempt.getResourceUsageReport();
}
public T getCurrentAttemptForContainer(ContainerId containerId) {
return getApplicationAttempt(containerId.getApplicationAttemptId());
}
@Override
public RMContainer getRMContainer(ContainerId containerId) {
SchedulerApplicationAttempt attempt =
getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
N node = nodes.get(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
@Override @Override
public String moveApplication(ApplicationId appId, String newQueue) public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException { throws YarnException {

View File

@ -23,11 +23,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@Private @Private
@Unstable @Unstable
public class SchedulerApplication { public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
private Queue queue; private Queue queue;
private final String user; private final String user;
private SchedulerApplicationAttempt currentAttempt; private T currentAttempt;
public SchedulerApplication(Queue queue, String user) { public SchedulerApplication(Queue queue, String user) {
this.queue = queue; this.queue = queue;
@ -46,11 +46,11 @@ public class SchedulerApplication {
return user; return user;
} }
public SchedulerApplicationAttempt getCurrentAppAttempt() { public T getCurrentAppAttempt() {
return currentAttempt; return currentAttempt;
} }
public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) { public void setCurrentAppAttempt(T currentAttempt) {
this.currentAttempt = currentAttempt; this.currentAttempt = currentAttempt;
} }

View File

@ -18,11 +18,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
/** /**
* Represents a YARN Cluster Node from the viewpoint of the scheduler. * Represents a YARN Cluster Node from the viewpoint of the scheduler.
@ -31,6 +47,50 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
@Unstable @Unstable
public abstract class SchedulerNode { public abstract class SchedulerNode {
private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
private Resource availableResource = Resource.newInstance(0, 0);
private Resource usedResource = Resource.newInstance(0, 0);
private Resource totalResourceCapability;
private RMContainer reservedContainer;
private volatile int numContainers;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
private final String nodeName;
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
this.rmNode = node;
this.availableResource = Resources.clone(node.getTotalCapability());
this.totalResourceCapability = Resources.clone(node.getTotalCapability());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
nodeName = rmNode.getHostName();
}
}
public RMNode getRMNode() {
return this.rmNode;
}
/**
* Get the ID of the node which contains both its hostname and port.
*
* @return the ID of the node
*/
public NodeId getNodeID() {
return this.rmNode.getNodeID();
}
public String getHttpAddress() {
return this.rmNode.getHttpAddress();
}
/** /**
* Get the name of the node for scheduling matching decisions. * Get the name of the node for scheduling matching decisions.
* <p/> * <p/>
@ -42,48 +102,176 @@ public abstract class SchedulerNode {
* *
* @return name of the node for scheduling matching decisions. * @return name of the node for scheduling matching decisions.
*/ */
public abstract String getNodeName(); public String getNodeName() {
return nodeName;
}
/** /**
* Get rackname. * Get rackname.
*
* @return rackname * @return rackname
*/ */
public abstract String getRackName(); public String getRackName() {
return this.rmNode.getRackName();
}
/** /**
* Get used resources on the node. * The Scheduler has allocated containers on this node to the given
* @return used resources on the node * application.
*
* @param applicationId
* application
* @param rmContainer
* allocated container
*/ */
public abstract Resource getUsedResource(); public synchronized void allocateContainer(ApplicationId applicationId,
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress()
+ ", which currently has " + numContainers + " containers, "
+ getUsedResource() + " used and " + getAvailableResource()
+ " available");
}
/** /**
* Get available resources on the node. * Get available resources on the node.
*
* @return available resources on the node * @return available resources on the node
*/ */
public abstract Resource getAvailableResource(); public synchronized Resource getAvailableResource() {
return this.availableResource;
}
/** /**
* Get number of active containers on the node. * Get used resources on the node.
* @return number of active containers on the node *
* @return used resources on the node
*/ */
public abstract int getNumContainers(); public synchronized Resource getUsedResource() {
return this.usedResource;
/** }
* Apply delta resource on node's available resource.
* @param deltaResource the delta of resource need to apply to node
*/
public abstract void applyDeltaOnAvailableResource(Resource deltaResource);
/** /**
* Get total resources on the node. * Get total resources on the node.
*
* @return total resources on the node. * @return total resources on the node.
*/ */
public abstract Resource getTotalResource(); public Resource getTotalResource() {
return this.totalResourceCapability;
}
private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId())) {
return true;
}
return false;
}
private synchronized void updateResource(Container container) {
addAvailableResource(container.getResource());
--numContainers;
}
/** /**
* Get the ID of the node which contains both its hostname and port. * Release an allocated container on this node.
* @return the ID of the node *
* @param container
* container to be released
*/ */
public abstract NodeId getNodeID(); public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container)) {
LOG.error("Invalid container released " + container);
return;
}
/* remove the containers from the nodemanger */
if (null != launchedContainers.remove(container.getId())) {
updateResource(container);
}
LOG.info("Released container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress()
+ ", which currently has " + numContainers + " containers, "
+ getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
}
private synchronized void addAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.addTo(availableResource, resource);
Resources.subtractFrom(usedResource, resource);
}
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.subtractFrom(availableResource, resource);
Resources.addTo(usedResource, resource);
}
/**
* Reserve container for the attempt on this node.
*/
public abstract void reserveResource(SchedulerApplicationAttempt attempt,
Priority priority, RMContainer container);
/**
* Unreserve resources on this node.
*/
public abstract void unreserveResource(SchedulerApplicationAttempt attempt);
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers="
+ getNumContainers() + " available="
+ getAvailableResource().getMemory() + " used="
+ getUsedResource().getMemory();
}
/**
* Get number of active containers on the node.
*
* @return number of active containers on the node
*/
public int getNumContainers() {
return numContainers;
}
public synchronized List<RMContainer> getRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
}
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
}
protected synchronized void
setReservedContainer(RMContainer reservedContainer) {
this.reservedContainer = reservedContainer;
}
/**
* Apply delta resource on node's available resource.
*
* @param deltaResource
* the delta of resource need to apply to node
*/
public synchronized void
applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
} }

View File

@ -70,6 +70,14 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
@Stable @Stable
public List<QueueUserACLInfo> getQueueUserAclInfo(); public List<QueueUserACLInfo> getQueueUserAclInfo();
/**
* Get the whole resource capacity of the cluster.
* @return the whole resource capacity of the cluster.
*/
@LimitedPrivate("yarn")
@Unstable
public Resource getClusterResource();
/** /**
* Get minimum allocatable {@link Resource}. * Get minimum allocatable {@link Resource}.
* @return minimum allocatable resource * @return minimum allocatable resource

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -49,11 +48,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -75,9 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -100,9 +95,9 @@ import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Evolving @Evolving
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class CapacityScheduler extends AbstractYarnScheduler public class CapacityScheduler extends
implements PreemptableResourceScheduler, CapacitySchedulerContext, AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
Configurable { PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
@ -182,16 +177,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>(); private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
private Map<NodeId, FiCaSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private Resource clusterResource =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
private int numNodeManagers = 0; private int numNodeManagers = 0;
private Resource minimumAllocation;
private Resource maximumAllocation;
private boolean initialized = false; private boolean initialized = false;
private ResourceCalculator calculator; private ResourceCalculator calculator;
@ -230,16 +217,6 @@ public class CapacityScheduler extends AbstractYarnScheduler
return this.rmContext.getContainerTokenSecretManager(); return this.rmContext.getContainerTokenSecretManager();
} }
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
}
@Override
public Resource getMaximumResourceCapability() {
return maximumAllocation;
}
@Override @Override
public Comparator<FiCaSchedulerApp> getApplicationComparator() { public Comparator<FiCaSchedulerApp> getApplicationComparator() {
return applicationComparator; return applicationComparator;
@ -265,11 +242,6 @@ public class CapacityScheduler extends AbstractYarnScheduler
return this.rmContext; return this.rmContext;
} }
@Override
public Resource getClusterResources() {
return clusterResource;
}
@Override @Override
public synchronized void public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException { reinitialize(Configuration conf, RMContext rmContext) throws IOException {
@ -283,7 +255,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
this.calculator = this.conf.getResourceCalculator(); this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = this.applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>(); new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
initializeQueues(this.conf); initializeQueues(this.conf);
@ -536,8 +508,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
.handle(new RMAppRejectedEvent(applicationId, ace.toString())); .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
return; return;
} }
SchedulerApplication application = SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication(queue, user); new SchedulerApplication<FiCaSchedulerApp>(queue, user);
applications.put(applicationId, application); applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName); + ", in queue: " + queueName);
@ -548,7 +520,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
private synchronized void addApplicationAttempt( private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) { boolean transferStateFromPreviousAttempt) {
SchedulerApplication application = SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue(); CSQueue queue = (CSQueue) application.getQueue();
@ -572,7 +544,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
private synchronized void doneApplication(ApplicationId applicationId, private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) { RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId); SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationId);
if (application == null){ if (application == null){
// The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
// ignore it. // ignore it.
@ -597,7 +570,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
" finalState=" + rmAppAttemptFinalState); " finalState=" + rmAppAttemptFinalState);
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
SchedulerApplication application = SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) { if (application == null || attempt == null) {
@ -659,7 +632,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests( SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResources(), ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), maximumAllocation); getMinimumResourceCapability(), maximumAllocation);
// Release containers // Release containers
@ -822,7 +795,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
// Try to schedule more if there are no reservations to fulfill // Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) { if (node.getReservedContainer() == null) {
if (Resources.greaterThanOrEqual(calculator, getClusterResources(), if (Resources.greaterThanOrEqual(calculator, getClusterResource(),
node.getAvailableResource(), minimumAllocation)) { node.getAvailableResource(), minimumAllocation)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() + LOG.debug("Trying to schedule on node: " + node.getNodeName() +
@ -942,7 +915,7 @@ public class CapacityScheduler extends AbstractYarnScheduler
} }
private synchronized void removeNode(RMNode nodeInfo) { private synchronized void removeNode(RMNode nodeInfo) {
FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
if (node == null) { if (node == null) {
return; return;
} }
@ -1015,28 +988,10 @@ public class CapacityScheduler extends AbstractYarnScheduler
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
@VisibleForTesting @VisibleForTesting
@Override
public FiCaSchedulerApp getApplicationAttempt( public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) { ApplicationAttemptId applicationAttemptId) {
SchedulerApplication app = return super.getApplicationAttempt(applicationAttemptId);
applications.get(applicationAttemptId.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
} }
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
@ -1049,36 +1004,12 @@ public class CapacityScheduler extends AbstractYarnScheduler
return nodes; return nodes;
} }
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
@VisibleForTesting
public FiCaSchedulerApp getCurrentAttemptForContainer(
ContainerId containerId) {
SchedulerApplication app =
applications.get(containerId.getApplicationAttemptId()
.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override @Override
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
public void recover(RMState state) throws Exception { public void recover(RMState state) throws Exception {
// NOT IMPLEMENTED // NOT IMPLEMENTED
} }
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
@Override @Override
public void dropContainerReservation(RMContainer container) { public void dropContainerReservation(RMContainer container) {
if(LOG.isDebugEnabled()){ if(LOG.isDebugEnabled()){

View File

@ -43,7 +43,7 @@ public interface CapacitySchedulerContext {
RMContext getRMContext(); RMContext getRMContext();
Resource getClusterResources(); Resource getClusterResource();
/** /**
* Get the yarn configuration. * Get the yarn configuration.

View File

@ -174,12 +174,12 @@ public class LeafQueue implements CSQueue {
int maxActiveApplications = int maxActiveApplications =
CSQueueUtils.computeMaxActiveApplications( CSQueueUtils.computeMaxActiveApplications(
resourceCalculator, resourceCalculator,
cs.getClusterResources(), this.minimumAllocation, cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteMaxCapacity); maxAMResourcePerQueuePercent, absoluteMaxCapacity);
this.maxActiveAppsUsingAbsCap = this.maxActiveAppsUsingAbsCap =
CSQueueUtils.computeMaxActiveApplications( CSQueueUtils.computeMaxActiveApplications(
resourceCalculator, resourceCalculator,
cs.getClusterResources(), this.minimumAllocation, cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteCapacity); maxAMResourcePerQueuePercent, absoluteCapacity);
int maxActiveApplicationsPerUser = int maxActiveApplicationsPerUser =
CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit,
@ -195,7 +195,7 @@ public class LeafQueue implements CSQueue {
cs.getConfiguration().getAcls(getQueuePath()); cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs( setupQueueConfigs(
cs.getClusterResources(), cs.getClusterResource(),
capacity, absoluteCapacity, capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor, userLimit, userLimitFactor,

View File

@ -143,7 +143,7 @@ public class ParentQueue implements CSQueue {
this.queueInfo.setQueueName(queueName); this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>()); this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
setupQueueConfigs(cs.getClusterResources(), setupQueueConfigs(cs.getClusterResource(),
capacity, absoluteCapacity, capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls); maximumCapacity, absoluteMaxCapacity, state, acls);

View File

@ -18,248 +18,84 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
public class FiCaSchedulerNode extends SchedulerNode { public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private Resource totalResourceCapability;
private volatile int numContainers;
private RMContainer reservedContainer;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
private final String nodeName;
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
this.rmNode = node; super(node, usePortForNodeName);
this.availableResource.setMemory(node.getTotalCapability().getMemory());
this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
totalResourceCapability =
Resource.newInstance(node.getTotalCapability().getMemory(), node
.getTotalCapability().getVirtualCores());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
nodeName = rmNode.getHostName();
}
}
public RMNode getRMNode() {
return this.rmNode;
}
public NodeId getNodeID() {
return this.rmNode.getNodeID();
}
public String getHttpAddress() {
return this.rmNode.getHttpAddress();
} }
@Override @Override
public String getNodeName() {
return nodeName;
}
@Override
public String getRackName() {
return this.rmNode.getRackName();
}
/**
* The Scheduler has allocated containers on this node to the
* given application.
*
* @param applicationId application
* @param rmContainer allocated container
*/
public synchronized void allocateContainer(ApplicationId applicationId,
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " +
getAvailableResource() + " available");
}
@Override
public synchronized Resource getAvailableResource() {
return this.availableResource;
}
@Override
public synchronized Resource getUsedResource() {
return this.usedResource;
}
@Override
public Resource getTotalResource() {
return this.totalResourceCapability;
}
private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId()))
return true;
return false;
}
private synchronized void updateResource(Container container) {
addAvailableResource(container.getResource());
--numContainers;
}
/**
* Release an allocated container on this node.
* @param container container to be released
*/
public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container)) {
LOG.error("Invalid container released " + container);
return;
}
/* remove the containers from the nodemanger */
if (null != launchedContainers.remove(container.getId())) {
updateResource(container);
}
LOG.info("Released container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
}
private synchronized void addAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.addTo(availableResource, resource);
Resources.subtractFrom(usedResource, resource);
}
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.subtractFrom(availableResource, resource);
Resources.addTo(usedResource, resource);
}
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
" available=" + getAvailableResource().getMemory() +
" used=" + getUsedResource().getMemory();
}
@Override
public int getNumContainers() {
return numContainers;
}
public synchronized List<RMContainer> getRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
}
public synchronized void reserveResource( public synchronized void reserveResource(
SchedulerApplicationAttempt application, Priority priority, SchedulerApplicationAttempt application, Priority priority,
RMContainer reservedContainer) { RMContainer container) {
// Check if it's already reserved // Check if it's already reserved
if (this.reservedContainer != null) { RMContainer reservedContainer = getReservedContainer();
if (reservedContainer != null) {
// Sanity check // Sanity check
if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { if (!container.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" + throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer + " container " + container +
" on node " + reservedContainer.getReservedNode() + " on node " + container.getReservedNode() +
" when currently" + " reserved resource " + this.reservedContainer + " when currently" + " reserved resource " + reservedContainer +
" on node " + this.reservedContainer.getReservedNode()); " on node " + reservedContainer.getReservedNode());
} }
// Cannot reserve more than one application attempt on a given node! // Cannot reserve more than one application attempt on a given node!
// Reservation is still against attempt. // Reservation is still against attempt.
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
reservedContainer.getContainer().getId().getApplicationAttemptId())) { .equals(container.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" + throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer + " container " + container +
" for application " + application.getApplicationAttemptId() + " for application " + application.getApplicationAttemptId() +
" when currently" + " when currently" +
" reserved container " + this.reservedContainer + " reserved container " + reservedContainer +
" on node " + this); " on node " + this);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Updated reserved container " LOG.debug("Updated reserved container "
+ reservedContainer.getContainer().getId() + " on node " + this + container.getContainer().getId() + " on node " + this
+ " for application attempt " + " for application attempt "
+ application.getApplicationAttemptId()); + application.getApplicationAttemptId());
} }
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Reserved container " LOG.debug("Reserved container "
+ reservedContainer.getContainer().getId() + " on node " + this + container.getContainer().getId() + " on node " + this
+ " for application attempt " + " for application attempt "
+ application.getApplicationAttemptId()); + application.getApplicationAttemptId());
} }
} }
this.reservedContainer = reservedContainer; setReservedContainer(container);
} }
@Override
public synchronized void unreserveResource( public synchronized void unreserveResource(
SchedulerApplicationAttempt application) { SchedulerApplicationAttempt application) {
// adding NP checks as this can now be called for preemption // adding NP checks as this can now be called for preemption
if (reservedContainer != null if (getReservedContainer() != null
&& reservedContainer.getContainer() != null && getReservedContainer().getContainer() != null
&& reservedContainer.getContainer().getId() != null && getReservedContainer().getContainer().getId() != null
&& reservedContainer.getContainer().getId().getApplicationAttemptId() != null) { && getReservedContainer().getContainer().getId()
.getApplicationAttemptId() != null) {
// Cannot unreserve for wrong application... // Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication = ApplicationAttemptId reservedApplication =
reservedContainer.getContainer().getId().getApplicationAttemptId(); getReservedContainer().getContainer().getId()
.getApplicationAttemptId();
if (!reservedApplication.equals( if (!reservedApplication.equals(
application.getApplicationAttemptId())) { application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " + throw new IllegalStateException("Trying to unreserve " +
@ -269,17 +105,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
" on node " + this); " on node " + this);
} }
} }
reservedContainer = null; setReservedContainer(null);
} }
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
}
@Override
public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
} }

View File

@ -119,9 +119,9 @@ public abstract class FSQueue extends Schedulable implements Queue {
// TODO: we might change these queue metrics around a little bit // TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler. // to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() / queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterCapacity().getMemory()); scheduler.getClusterResource().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() / queueInfo.setCapacity((float) getResourceUsage().getMemory() /
scheduler.getClusterCapacity().getMemory()); scheduler.getClusterResource().getMemory());
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>(); ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
if (includeChildQueues) { if (includeChildQueues) {

View File

@ -18,28 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private @Private
@Unstable @Unstable
@ -47,208 +35,56 @@ public class FSSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Resource availableResource;
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private Resource totalResourceCapability;
private volatile int numContainers;
private RMContainer reservedContainer;
private AppSchedulable reservedAppSchedulable; private AppSchedulable reservedAppSchedulable;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
private final String nodeName;
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
this.rmNode = node; super(node, usePortForNodeName);
this.availableResource = Resources.clone(node.getTotalCapability());
totalResourceCapability =
Resource.newInstance(node.getTotalCapability().getMemory(), node
.getTotalCapability().getVirtualCores());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
nodeName = rmNode.getHostName();
}
}
public RMNode getRMNode() {
return rmNode;
}
public NodeId getNodeID() {
return rmNode.getNodeID();
}
public String getHttpAddress() {
return rmNode.getHttpAddress();
} }
@Override @Override
public String getNodeName() {
return nodeName;
}
@Override
public String getRackName() {
return rmNode.getRackName();
}
/**
* The Scheduler has allocated containers on this node to the
* given application.
*
* @param applicationId application
* @param rmContainer allocated container
*/
public synchronized void allocateContainer(ApplicationId applicationId,
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " +
getAvailableResource() + " available");
}
@Override
public synchronized Resource getAvailableResource() {
return availableResource;
}
@Override
public synchronized Resource getUsedResource() {
return usedResource;
}
private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId())) {
return true;
}
return false;
}
private synchronized void updateResource(Container container) {
addAvailableResource(container.getResource());
--numContainers;
}
/**
* Release an allocated container on this node.
* @param container container to be released
*/
public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container)) {
LOG.error("Invalid container released " + container);
return;
}
/* remove the containers from the nodemanger */
launchedContainers.remove(container.getId());
updateResource(container);
LOG.info("Released container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
}
private synchronized void addAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.addTo(availableResource, resource);
Resources.subtractFrom(usedResource, resource);
}
@Override
public Resource getTotalResource() {
return this.totalResourceCapability;
}
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.subtractFrom(availableResource, resource);
Resources.addTo(usedResource, resource);
}
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
" available=" + getAvailableResource() +
" used=" + getUsedResource();
}
@Override
public int getNumContainers() {
return numContainers;
}
public synchronized List<RMContainer> getRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
}
public synchronized void reserveResource( public synchronized void reserveResource(
FSSchedulerApp application, Priority priority, SchedulerApplicationAttempt application, Priority priority,
RMContainer reservedContainer) { RMContainer container) {
// Check if it's already reserved // Check if it's already reserved
if (this.reservedContainer != null) { RMContainer reservedContainer = getReservedContainer();
if (reservedContainer != null) {
// Sanity check // Sanity check
if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { if (!container.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" + throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer + " container " + container +
" on node " + reservedContainer.getReservedNode() + " on node " + container.getReservedNode() +
" when currently" + " reserved resource " + this.reservedContainer + " when currently" + " reserved resource " + reservedContainer +
" on node " + this.reservedContainer.getReservedNode()); " on node " + reservedContainer.getReservedNode());
} }
// Cannot reserve more than one application on a given node! // Cannot reserve more than one application on a given node!
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
reservedContainer.getContainer().getId().getApplicationAttemptId())) { .equals(container.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" + throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer + " container " + container +
" for application " + application.getApplicationId() + " for application " + application.getApplicationId() +
" when currently" + " when currently" +
" reserved container " + this.reservedContainer + " reserved container " + reservedContainer +
" on node " + this); " on node " + this);
} }
LOG.info("Updated reserved container " + LOG.info("Updated reserved container " +
reservedContainer.getContainer().getId() + " on node " + container.getContainer().getId() + " on node " +
this + " for application " + application); this + " for application " + application);
} else { } else {
LOG.info("Reserved container " + reservedContainer.getContainer().getId() + LOG.info("Reserved container " + container.getContainer().getId() +
" on node " + this + " for application " + application); " on node " + this + " for application " + application);
} }
this.reservedContainer = reservedContainer; setReservedContainer(container);
this.reservedAppSchedulable = application.getAppSchedulable(); this.reservedAppSchedulable = ((FSSchedulerApp) application).getAppSchedulable();
} }
@Override
public synchronized void unreserveResource( public synchronized void unreserveResource(
FSSchedulerApp application) { SchedulerApplicationAttempt application) {
// Cannot unreserve for wrong application... // Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication = ApplicationAttemptId reservedApplication =
reservedContainer.getContainer().getId().getApplicationAttemptId(); getReservedContainer().getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals( if (!reservedApplication.equals(
application.getApplicationAttemptId())) { application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " + throw new IllegalStateException("Trying to unreserve " +
@ -258,22 +94,11 @@ public class FSSchedulerNode extends SchedulerNode {
" on node " + this); " on node " + this);
} }
this.reservedContainer = null; setReservedContainer(null);
this.reservedAppSchedulable = null; this.reservedAppSchedulable = null;
} }
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
}
public synchronized AppSchedulable getReservedAppSchedulable() { public synchronized AppSchedulable getReservedAppSchedulable() {
return reservedAppSchedulable; return reservedAppSchedulable;
} }
@Override
public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
} }

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -53,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -76,10 +74,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -122,11 +118,11 @@ import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Unstable @Unstable
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class FairScheduler extends AbstractYarnScheduler { public class FairScheduler extends
AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
private boolean initialized; private boolean initialized;
private FairSchedulerConfiguration conf; private FairSchedulerConfiguration conf;
private Resource minimumAllocation;
private Resource maximumAllocation;
private Resource incrAllocation; private Resource incrAllocation;
private QueueManager queueMgr; private QueueManager queueMgr;
private Clock clock; private Clock clock;
@ -152,14 +148,6 @@ public class FairScheduler extends AbstractYarnScheduler {
// Time we last ran preemptTasksIfNecessary // Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime; private long lastPreemptCheckTime;
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster
private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted // How often tasks are preempted
protected long preemptionInterval; protected long preemptionInterval;
@ -246,23 +234,6 @@ public class FairScheduler extends AbstractYarnScheduler {
return queueMgr; return queueMgr;
} }
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
private FSSchedulerApp getCurrentAttemptForContainer(
ContainerId containerId) {
SchedulerApplication app =
applications.get(containerId.getApplicationAttemptId()
.getApplicationId());
if (app != null) {
return (FSSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
/** /**
* A runnable which calls {@link FairScheduler#update()} every * A runnable which calls {@link FairScheduler#update()} every
* <code>UPDATE_INTERVAL</code> milliseconds. * <code>UPDATE_INTERVAL</code> milliseconds.
@ -294,7 +265,7 @@ public class FairScheduler extends AbstractYarnScheduler {
// Recursively update demands for all queues // Recursively update demands for all queues
rootQueue.updateDemand(); rootQueue.updateDemand();
rootQueue.setFairShare(clusterCapacity); rootQueue.setFairShare(clusterResource);
// Recursively compute fair shares for all queues // Recursively compute fair shares for all queues
// and update metrics // and update metrics
rootQueue.recomputeShares(); rootQueue.recomputeShares();
@ -322,9 +293,9 @@ public class FairScheduler extends AbstractYarnScheduler {
* Is a queue below its min share for the given task type? * Is a queue below its min share for the given task type?
*/ */
boolean isStarvedForMinShare(FSLeafQueue sched) { boolean isStarvedForMinShare(FSLeafQueue sched) {
Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand()); sched.getMinShare(), sched.getDemand());
return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredShare); sched.getResourceUsage(), desiredShare);
} }
@ -333,9 +304,9 @@ public class FairScheduler extends AbstractYarnScheduler {
* defined as being below half its fair share. * defined as being below half its fair share.
*/ */
boolean isStarvedForFairShare(FSLeafQueue sched) { boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredFairShare); sched.getResourceUsage(), desiredFairShare);
} }
@ -362,7 +333,7 @@ public class FairScheduler extends AbstractYarnScheduler {
for (FSLeafQueue sched : queueMgr.getLeafQueues()) { for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
} }
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) { Resources.none())) {
preemptResources(queueMgr.getLeafQueues(), resToPreempt); preemptResources(queueMgr.getLeafQueues(), resToPreempt);
} }
@ -389,7 +360,7 @@ public class FairScheduler extends AbstractYarnScheduler {
// Collect running containers from over-scheduled queues // Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>(); List<RMContainer> runningContainers = new ArrayList<RMContainer>();
for (FSLeafQueue sched : scheds) { for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), sched.getFairShare())) { sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as : sched.getRunnableAppSchedulables()) { for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) { for (RMContainer c : as.getApp().getLiveContainers()) {
@ -421,7 +392,7 @@ public class FairScheduler extends AbstractYarnScheduler {
while (warnedIter.hasNext()) { while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next(); RMContainer container = warnedIter.next();
if (container.getState() == RMContainerState.RUNNING && if (container.getState() == RMContainerState.RUNNING &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) { toPreempt, Resources.none())) {
warnOrKillContainer(container, apps.get(container), queues.get(container)); warnOrKillContainer(container, apps.get(container), queues.get(container));
preemptedThisRound.add(container); preemptedThisRound.add(container);
@ -435,12 +406,12 @@ public class FairScheduler extends AbstractYarnScheduler {
// sure we don't preempt too many from any queue // sure we don't preempt too many from any queue
Iterator<RMContainer> runningIter = runningContainers.iterator(); Iterator<RMContainer> runningIter = runningContainers.iterator();
while (runningIter.hasNext() && while (runningIter.hasNext() &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) { toPreempt, Resources.none())) {
RMContainer container = runningIter.next(); RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container); FSLeafQueue sched = queues.get(container);
if (!preemptedThisRound.contains(container) && if (!preemptedThisRound.contains(container) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), sched.getFairShare())) { sched.getResourceUsage(), sched.getFairShare())) {
warnOrKillContainer(container, apps.get(container), sched); warnOrKillContainer(container, apps.get(container), sched);
@ -496,20 +467,20 @@ public class FairScheduler extends AbstractYarnScheduler {
Resource resDueToMinShare = Resources.none(); Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none(); Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand()); sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage())); Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
} }
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getFairShare(), sched.getDemand()); sched.getFairShare(), sched.getDemand());
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage())); Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
} }
Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
resDueToMinShare, resDueToFairShare); resDueToMinShare, resDueToFairShare);
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt, Resources.none())) { resToPreempt, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue " String message = "Should preempt " + resToPreempt + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
@ -540,18 +511,12 @@ public class FairScheduler extends AbstractYarnScheduler {
return resourceWeights; return resourceWeights;
} }
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
}
public Resource getIncrementResourceCapability() { public Resource getIncrementResourceCapability() {
return incrAllocation; return incrAllocation;
} }
@Override private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
public Resource getMaximumResourceCapability() { return nodes.get(nodeId);
return maximumAllocation;
} }
public double getNodeLocalityThreshold() { public double getNodeLocalityThreshold() {
@ -578,10 +543,6 @@ public class FairScheduler extends AbstractYarnScheduler {
return continuousSchedulingSleepMs; return continuousSchedulingSleepMs;
} }
public Resource getClusterCapacity() {
return clusterCapacity;
}
public synchronized Clock getClock() { public synchronized Clock getClock() {
return clock; return clock;
} }
@ -629,8 +590,8 @@ public class FairScheduler extends AbstractYarnScheduler {
return; return;
} }
SchedulerApplication application = SchedulerApplication<FSSchedulerApp> application =
new SchedulerApplication(queue, user); new SchedulerApplication<FSSchedulerApp>(queue, user);
applications.put(applicationId, application); applications.put(applicationId, application);
queue.getMetrics().submitApp(user); queue.getMetrics().submitApp(user);
@ -647,7 +608,7 @@ public class FairScheduler extends AbstractYarnScheduler {
protected synchronized void addApplicationAttempt( protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) { boolean transferStateFromPreviousAttempt) {
SchedulerApplication application = SchedulerApplication<FSSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser(); String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue(); FSLeafQueue queue = (FSLeafQueue) application.getQueue();
@ -720,7 +681,8 @@ public class FairScheduler extends AbstractYarnScheduler {
private synchronized void removeApplication(ApplicationId applicationId, private synchronized void removeApplication(ApplicationId applicationId,
RMAppState finalState) { RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId); SchedulerApplication<FSSchedulerApp> application =
applications.get(applicationId);
if (application == null){ if (application == null){
LOG.warn("Couldn't find application " + applicationId); LOG.warn("Couldn't find application " + applicationId);
return; return;
@ -734,7 +696,7 @@ public class FairScheduler extends AbstractYarnScheduler {
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." + LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState); " finalState=" + rmAppAttemptFinalState);
SchedulerApplication application = SchedulerApplication<FSSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());
FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
@ -809,7 +771,7 @@ public class FairScheduler extends AbstractYarnScheduler {
} }
// Get the node on which the container was allocated // Get the node on which the container was allocated
FSSchedulerNode node = nodes.get(container.getNodeId()); FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
if (rmContainer.getState() == RMContainerState.RESERVED) { if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(node, rmContainer.getReservedPriority()); application.unreserve(node, rmContainer.getReservedPriority());
@ -827,20 +789,20 @@ public class FairScheduler extends AbstractYarnScheduler {
private synchronized void addNode(RMNode node) { private synchronized void addNode(RMNode node) {
nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
Resources.addTo(clusterCapacity, node.getTotalCapability()); Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics(); updateRootQueueMetrics();
LOG.info("Added node " + node.getNodeAddress() + LOG.info("Added node " + node.getNodeAddress() +
" cluster capacity: " + clusterCapacity); " cluster capacity: " + clusterResource);
} }
private synchronized void removeNode(RMNode rmNode) { private synchronized void removeNode(RMNode rmNode) {
FSSchedulerNode node = nodes.get(rmNode.getNodeID()); FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
// This can occur when an UNHEALTHY node reconnects // This can occur when an UNHEALTHY node reconnects
if (node == null) { if (node == null) {
return; return;
} }
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
updateRootQueueMetrics(); updateRootQueueMetrics();
// Remove running containers // Remove running containers
@ -865,7 +827,7 @@ public class FairScheduler extends AbstractYarnScheduler {
nodes.remove(rmNode.getNodeID()); nodes.remove(rmNode.getNodeID());
LOG.info("Removed node " + rmNode.getNodeAddress() + LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterCapacity); " cluster capacity: " + clusterResource);
} }
@Override @Override
@ -882,7 +844,7 @@ public class FairScheduler extends AbstractYarnScheduler {
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
// Release containers // Release containers
for (ContainerId releasedContainerId : release) { for (ContainerId releasedContainerId : release) {
@ -961,13 +923,13 @@ public class FairScheduler extends AbstractYarnScheduler {
*/ */
private synchronized void nodeUpdate(RMNode nm) { private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
} }
eventLog.log("HEARTBEAT", nm.getHostName()); eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID()); FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
// Update resource if any change // Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@ -1012,7 +974,7 @@ public class FairScheduler extends AbstractYarnScheduler {
// iterate all nodes // iterate all nodes
for (NodeId nodeId : nodeIdList) { for (NodeId nodeId : nodeIdList) {
if (nodes.containsKey(nodeId)) { if (nodes.containsKey(nodeId)) {
FSSchedulerNode node = nodes.get(nodeId); FSSchedulerNode node = getFSSchedulerNode(nodeId);
try { try {
if (Resources.fitsIn(minimumAllocation, if (Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) { node.getAvailableResource())) {
@ -1038,7 +1000,7 @@ public class FairScheduler extends AbstractYarnScheduler {
@Override @Override
public int compare(NodeId n1, NodeId n2) { public int compare(NodeId n1, NodeId n2) {
return RESOURCE_CALCULATOR.compare(clusterCapacity, return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getAvailableResource(), nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource()); nodes.get(n1).getAvailableResource());
} }
@ -1075,7 +1037,7 @@ public class FairScheduler extends AbstractYarnScheduler {
int assignedContainers = 0; int assignedContainers = 0;
while (node.getReservedContainer() == null) { while (node.getReservedContainer() == null) {
boolean assignedContainer = false; boolean assignedContainer = false;
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
queueMgr.getRootQueue().assignContainer(node), queueMgr.getRootQueue().assignContainer(node),
Resources.none())) { Resources.none())) {
assignedContainers++; assignedContainers++;
@ -1089,45 +1051,8 @@ public class FairScheduler extends AbstractYarnScheduler {
updateRootQueueMetrics(); updateRootQueueMetrics();
} }
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
FSSchedulerNode node = nodes.get(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
SchedulerApplication app = return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId);
applications.get(appAttemptId.getApplicationId());
if (app != null) {
return (FSSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
return new SchedulerAppReport(attempt);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
return attempt.getResourceUsageReport();
} }
/** /**
@ -1139,7 +1064,7 @@ public class FairScheduler extends AbstractYarnScheduler {
private void updateRootQueueMetrics() { private void updateRootQueueMetrics() {
rootMetrics.setAvailableResourcesToQueue( rootMetrics.setAvailableResourcesToQueue(
Resources.subtract( Resources.subtract(
clusterCapacity, rootMetrics.getAllocatedResources())); clusterResource, rootMetrics.getAllocatedResources()));
} }
@Override @Override
@ -1258,7 +1183,7 @@ public class FairScheduler extends AbstractYarnScheduler {
this.rmContext = rmContext; this.rmContext = rmContext;
// This stores per-application scheduling information // This stores per-application scheduling information
this.applications = this.applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>(); new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
this.eventLog = new FairSchedulerEventLog(); this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf); eventLog.init(this.conf);
@ -1365,7 +1290,7 @@ public class FairScheduler extends AbstractYarnScheduler {
// if it does not already exist, so it can be displayed on the web UI. // if it does not already exist, so it can be displayed on the web UI.
synchronized (FairScheduler.this) { synchronized (FairScheduler.this) {
allocConf = queueInfo; allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
queueMgr.updateAllocationConfiguration(allocConf); queueMgr.updateAllocationConfiguration(allocConf);
} }
} }
@ -1385,7 +1310,7 @@ public class FairScheduler extends AbstractYarnScheduler {
@Override @Override
public synchronized String moveApplication(ApplicationId appId, public synchronized String moveApplication(ApplicationId appId,
String queueName) throws YarnException { String queueName) throws YarnException {
SchedulerApplication app = applications.get(appId); SchedulerApplication<FSSchedulerApp> app = applications.get(appId);
if (app == null) { if (app == null) {
throw new YarnException("App to be moved " + appId + " not found."); throw new YarnException("App to be moved " + appId + " not found.");
} }
@ -1449,8 +1374,8 @@ public class FairScheduler extends AbstractYarnScheduler {
* Helper for moveApplication, which has appropriate synchronization, so all * Helper for moveApplication, which has appropriate synchronization, so all
* operations will be atomic. * operations will be atomic.
*/ */
private void executeMove(SchedulerApplication app, FSSchedulerApp attempt, private void executeMove(SchedulerApplication<FSSchedulerApp> app,
FSLeafQueue oldQueue, FSLeafQueue newQueue) { FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
boolean wasRunnable = oldQueue.removeApp(attempt); boolean wasRunnable = oldQueue.removeApp(attempt);
// if app was not runnable before, it may be runnable now // if app was not runnable before, it may be runnable now
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,

View File

@ -369,7 +369,7 @@ public class QueueManager {
// Set scheduling policies // Set scheduling policies
try { try {
SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
policy.initialize(scheduler.getClusterCapacity()); policy.initialize(scheduler.getClusterResource());
queue.setPolicy(policy); queue.setPolicy(policy);
} catch (AllocationConfigurationException ex) { } catch (AllocationConfigurationException ex) {
LOG.warn("Cannot apply configured scheduling policy to queue " LOG.warn("Cannot apply configured scheduling policy to queue "

View File

@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -38,7 +37,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -76,11 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -104,7 +100,8 @@ import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Evolving @Evolving
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class FifoScheduler extends AbstractYarnScheduler implements public class FifoScheduler extends
AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
Configurable { Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class); private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@ -114,11 +111,7 @@ public class FifoScheduler extends AbstractYarnScheduler implements
Configuration conf; Configuration conf;
protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized; private boolean initialized;
private Resource minimumAllocation;
private Resource maximumAllocation;
private boolean usePortForNodeName; private boolean usePortForNodeName;
private ActiveUsersManager activeUsersManager; private ActiveUsersManager activeUsersManager;
@ -217,21 +210,11 @@ public class FifoScheduler extends AbstractYarnScheduler implements
return conf; return conf;
} }
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
}
@Override @Override
public int getNumClusterNodes() { public int getNumClusterNodes() {
return nodes.size(); return nodes.size();
} }
@Override
public Resource getMaximumResourceCapability() {
return maximumAllocation;
}
@Override @Override
public synchronized void public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException reinitialize(Configuration conf, RMContext rmContext) throws IOException
@ -242,7 +225,7 @@ public class FifoScheduler extends AbstractYarnScheduler implements
this.rmContext = rmContext; this.rmContext = rmContext;
//Use ConcurrentSkipListMap because applications need to be ordered //Use ConcurrentSkipListMap because applications need to be ordered
this.applications = this.applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>(); new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
this.minimumAllocation = this.minimumAllocation =
Resources.createResource(conf.getInt( Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@ -332,30 +315,6 @@ public class FifoScheduler extends AbstractYarnScheduler implements
} }
} }
@VisibleForTesting
FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication app =
applications.get(applicationAttemptId.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
private FiCaSchedulerNode getNode(NodeId nodeId) { private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId); return nodes.get(nodeId);
} }
@ -363,8 +322,8 @@ public class FifoScheduler extends AbstractYarnScheduler implements
@VisibleForTesting @VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId, public synchronized void addApplication(ApplicationId applicationId,
String queue, String user) { String queue, String user) {
SchedulerApplication application = SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication(DEFAULT_QUEUE, user); new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
applications.put(applicationId, application); applications.put(applicationId, application);
metrics.submitApp(user); metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user LOG.info("Accepted application " + applicationId + " from user: " + user
@ -377,7 +336,7 @@ public class FifoScheduler extends AbstractYarnScheduler implements
public synchronized void public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId, addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt) { boolean transferStateFromPreviousAttempt) {
SchedulerApplication application = SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId()); applications.get(appAttemptId.getApplicationId());
String user = application.getUser(); String user = application.getUser();
// TODO: Fix store // TODO: Fix store
@ -401,7 +360,8 @@ public class FifoScheduler extends AbstractYarnScheduler implements
private synchronized void doneApplication(ApplicationId applicationId, private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) { RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId); SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationId);
if (application == null){ if (application == null){
LOG.warn("Couldn't find application " + applicationId); LOG.warn("Couldn't find application " + applicationId);
return; return;
@ -419,7 +379,7 @@ public class FifoScheduler extends AbstractYarnScheduler implements
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException { throws IOException {
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
SchedulerApplication application = SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) { if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId + throw new IOException("Unknown application " + applicationAttemptId +
@ -456,13 +416,13 @@ public class FifoScheduler extends AbstractYarnScheduler implements
" #applications=" + applications.size()); " #applications=" + applications.size());
// Try to assign containers to applications in fifo order // Try to assign containers to applications in fifo order
for (Map.Entry<ApplicationId, SchedulerApplication> e : applications for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
.entrySet()) { .entrySet()) {
FiCaSchedulerApp application = FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
(FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
if (application == null) { if (application == null) {
continue; continue;
} }
LOG.debug("pre-assignContainers"); LOG.debug("pre-assignContainers");
application.showRequests(); application.showRequests();
synchronized (application) { synchronized (application) {
@ -499,7 +459,7 @@ public class FifoScheduler extends AbstractYarnScheduler implements
// Update the applications' headroom to correctly take into // Update the applications' headroom to correctly take into
// account the containers assigned in this update. // account the containers assigned in this update.
for (SchedulerApplication application : applications.values()) { for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
FiCaSchedulerApp attempt = FiCaSchedulerApp attempt =
(FiCaSchedulerApp) application.getCurrentAppAttempt(); (FiCaSchedulerApp) application.getCurrentAppAttempt();
if (attempt == null) { if (attempt == null) {
@ -864,7 +824,6 @@ public class FifoScheduler extends AbstractYarnScheduler implements
} }
private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) { private synchronized void removeNode(RMNode nodeInfo) {
@ -910,29 +869,12 @@ public class FifoScheduler extends AbstractYarnScheduler implements
// NOT IMPLEMENTED // NOT IMPLEMENTED
} }
@Override
public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
@Override @Override
public RMContainer getRMContainer(ContainerId containerId) { public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId);
} }
private FiCaSchedulerApp getCurrentAttemptForContainer(
ContainerId containerId) {
SchedulerApplication app =
applications.get(containerId.getApplicationAttemptId()
.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override @Override
public QueueMetrics getRootQueueMetrics() { public QueueMetrics getRootQueueMetrics() {
return DEFAULT_QUEUE.getMetrics(); return DEFAULT_QUEUE.getMetrics();
@ -945,11 +887,12 @@ public class FifoScheduler extends AbstractYarnScheduler implements
} }
@Override @Override
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) { public synchronized List<ApplicationAttemptId>
getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>( List<ApplicationAttemptId> attempts =
applications.size()); new ArrayList<ApplicationAttemptId>(applications.size());
for (SchedulerApplication app : applications.values()) { for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId()); attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
} }
return attempts; return attempts;
@ -957,5 +900,4 @@ public class FifoScheduler extends AbstractYarnScheduler implements
return null; return null;
} }
} }
} }

View File

@ -70,7 +70,7 @@ public class FairSchedulerQueueInfo {
queueName = queue.getName(); queueName = queue.getName();
schedulingPolicy = queue.getPolicy().getName(); schedulingPolicy = queue.getPolicy().getName();
clusterResources = new ResourceInfo(scheduler.getClusterCapacity()); clusterResources = new ResourceInfo(scheduler.getClusterResource());
usedResources = new ResourceInfo(queue.getResourceUsage()); usedResources = new ResourceInfo(queue.getResourceUsage());
fractionMemUsed = (float)usedResources.getMemory() / fractionMemUsed = (float)usedResources.getMemory() /
@ -81,7 +81,7 @@ public class FairSchedulerQueueInfo {
maxResources = new ResourceInfo(queue.getMaxShare()); maxResources = new ResourceInfo(queue.getMaxShare());
maxResources = new ResourceInfo( maxResources = new ResourceInfo(
Resources.componentwiseMin(queue.getMaxShare(), Resources.componentwiseMin(queue.getMaxShare(),
scheduler.getClusterCapacity())); scheduler.getClusterResource()));
fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory(); fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory();
fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory(); fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();

View File

@ -456,7 +456,7 @@ public class TestProportionalCapacityPreemptionPolicy {
Resource clusterResources = Resource clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResources()).thenReturn(clusterResources); when(mCS.getClusterResource()).thenReturn(clusterResources);
return policy; return policy;
} }

View File

@ -384,15 +384,18 @@ public class TestSchedulerUtils {
Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus()); Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
} }
public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler( public static SchedulerApplication<SchedulerApplicationAttempt>
final Map<ApplicationId, SchedulerApplication> applications, verifyAppAddedAndRemovedFromScheduler(
EventHandler<SchedulerEvent> handler, String queueName) throws Exception { Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
EventHandler<SchedulerEvent> handler, String queueName)
throws Exception {
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppAddedSchedulerEvent appAddedEvent = AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appId, queueName, "user"); new AppAddedSchedulerEvent(appId, queueName, "user");
handler.handle(appAddedEvent); handler.handle(appAddedEvent);
SchedulerApplication app = applications.get(appId); SchedulerApplication<SchedulerApplicationAttempt> app =
applications.get(appId);
// verify application is added. // verify application is added.
Assert.assertNotNull(app); Assert.assertNotNull(app);
Assert.assertEquals("user", app.getUser()); Assert.assertEquals("user", app.getUser());

View File

@ -81,7 +81,7 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(GB, 1)); thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()). when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32)); thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResources()). when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getApplicationComparator()). when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator); thenReturn(CapacityScheduler.applicationComparator);
@ -165,7 +165,7 @@ public class TestApplicationLimits {
// Say cluster has 100 nodes of 16G each // Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
when(csContext.getClusterResources()).thenReturn(clusterResource); when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root = CSQueue root =
@ -478,7 +478,7 @@ public class TestApplicationLimits {
// Say cluster has 100 nodes of 16G each // Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB); Resource clusterResource = Resources.createResource(100 * 16 * GB);
when(csContext.getClusterResources()).thenReturn(clusterResource); when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacityScheduler.parseQueue(csContext, csConf, null, "root", CapacityScheduler.parseQueue(csContext, csConf, null, "root",

View File

@ -29,8 +29,6 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -56,8 +54,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -465,14 +467,14 @@ public class TestCapacityScheduler {
cs.handle(new NodeAddedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n2)); cs.handle(new NodeAddedSchedulerEvent(n2));
Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory()); Assert.assertEquals(6 * GB, cs.getClusterResource().getMemory());
// reconnect n1 with downgraded memory // reconnect n1 with downgraded memory
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1); n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
cs.handle(new NodeRemovedSchedulerEvent(n1)); cs.handle(new NodeRemovedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n1));
Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
} }
@Test @Test
@ -627,17 +629,17 @@ public class TestCapacityScheduler {
@Test @Test
public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
AsyncDispatcher rmDispatcher = new AsyncDispatcher();
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf); setupQueueConfiguration(conf);
cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
null, null, new RMContainerTokenSecretManager(conf), ResourceScheduler.class);
new NMTokenSecretManagerInRM(conf), MockRM rm = new MockRM(conf);
new ClientToAMTokenSecretManagerInRM(), null)); @SuppressWarnings("unchecked")
AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
(AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
.getResourceScheduler();
SchedulerApplication app = SchedulerApplication<SchedulerApplicationAttempt> app =
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
cs.getSchedulerApplications(), cs, "a1"); cs.getSchedulerApplications(), cs, "a1");
Assert.assertEquals("a1", app.getQueue().getQueueName()); Assert.assertEquals("a1", app.getQueue().getQueueName());

View File

@ -89,7 +89,7 @@ public class TestChildQueueOrder {
Resources.createResource(GB, 1)); Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn( when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB, 32)); Resources.createResource(16*GB, 32));
when(csContext.getClusterResources()). when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()). when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator); thenReturn(CapacityScheduler.applicationComparator);

View File

@ -122,7 +122,7 @@ public class TestLeafQueue {
thenReturn(Resources.createResource(GB, 1)); thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()). when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32)); thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResources()). when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()). when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator); thenReturn(CapacityScheduler.applicationComparator);
@ -1651,7 +1651,7 @@ public class TestLeafQueue {
newQueues, queues, newQueues, queues,
TestUtils.spyHook); TestUtils.spyHook);
queues = newQueues; queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResources()); root.reinitialize(newRoot, cs.getClusterResource());
// after reinitialization // after reinitialization
assertEquals(3, e.activeApplications.size()); assertEquals(3, e.activeApplications.size());
@ -1676,7 +1676,7 @@ public class TestLeafQueue {
newQueues, queues, newQueues, queues,
TestUtils.spyHook); TestUtils.spyHook);
queues = newQueues; queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResources()); root.reinitialize(newRoot, cs.getClusterResource());
// after reinitialization // after reinitialization
assertEquals(60, e.getNodeLocalityDelay()); assertEquals(60, e.getNodeLocalityDelay());
@ -2070,7 +2070,7 @@ public class TestLeafQueue {
when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(new YarnConfiguration()); when(csContext.getConf()).thenReturn(new YarnConfiguration());
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getClusterResources()).thenReturn(clusterResource); when(csContext.getClusterResource()).thenReturn(clusterResource);
when(csContext.getMinimumResourceCapability()).thenReturn( when(csContext.getMinimumResourceCapability()).thenReturn(
Resources.createResource(GB, 1)); Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn( when(csContext.getMaximumResourceCapability()).thenReturn(

View File

@ -86,7 +86,7 @@ public class TestParentQueue {
Resources.createResource(GB, 1)); Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn( when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB, 32)); Resources.createResource(16*GB, 32));
when(csContext.getClusterResources()). when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()). when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator); thenReturn(CapacityScheduler.applicationComparator);

View File

@ -43,7 +43,6 @@ import java.util.Set;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -80,8 +79,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -95,12 +97,14 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@SuppressWarnings("unchecked")
public class TestFairScheduler { public class TestFairScheduler {
static class MockClock implements Clock { static class MockClock implements Clock {
@ -377,19 +381,19 @@ public class TestFairScheduler {
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1); scheduler.handle(nodeEvent1);
assertEquals(1024, scheduler.getClusterCapacity().getMemory()); assertEquals(1024, scheduler.getClusterResource().getMemory());
// Add another node // Add another node
RMNode node2 = RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2"); MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2); scheduler.handle(nodeEvent2);
assertEquals(1536, scheduler.getClusterCapacity().getMemory()); assertEquals(1536, scheduler.getClusterResource().getMemory());
// Remove the first node // Remove the first node
NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1); NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(nodeEvent3); scheduler.handle(nodeEvent3);
assertEquals(512, scheduler.getClusterCapacity().getMemory()); assertEquals(512, scheduler.getClusterResource().getMemory());
} }
@Test @Test
@ -2123,7 +2127,7 @@ public class TestFairScheduler {
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity()); drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update(); scheduler.update();
@ -2167,7 +2171,7 @@ public class TestFairScheduler {
FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity()); drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update(); scheduler.update();
@ -2210,7 +2214,7 @@ public class TestFairScheduler {
FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity()); drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
@ -2466,8 +2470,8 @@ public class TestFairScheduler {
fs.handle(nodeEvent2); fs.handle(nodeEvent2);
// available resource // available resource
Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024); Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024);
Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16); Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16);
// send application request // send application request
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
@ -2647,8 +2651,9 @@ public class TestFairScheduler {
@Test @Test
public void testAddAndRemoveAppFromFairScheduler() throws Exception { public void testAddAndRemoveAppFromFairScheduler() throws Exception {
FairScheduler scheduler = AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> scheduler =
(FairScheduler) resourceManager.getResourceScheduler(); (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) resourceManager
.getResourceScheduler();
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
scheduler.getSchedulerApplications(), scheduler, "default"); scheduler.getSchedulerApplications(), scheduler, "default");
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
@ -30,8 +29,6 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -60,13 +57,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -78,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -594,9 +592,12 @@ public class TestFifoScheduler {
public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { public void testAddAndRemoveAppFromFiFoScheduler() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class); ResourceScheduler.class);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); @SuppressWarnings("unchecked")
AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> fs =
(AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
.getResourceScheduler();
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
fs.getSchedulerApplications(), fs, "queue"); fs.getSchedulerApplications(), fs, "queue");
} }