Merge -c 1362332 from trunk to branch-2 to fix MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal interface to allow schedulers to maintain their own.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1362333 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-07-17 01:49:30 +00:00
parent 2b81fe68b7
commit 1f299865c0
27 changed files with 1358 additions and 566 deletions

View File

@ -1,15 +1,20 @@
Hadoop MapReduce Change Log
Release 2.0.1-alpha - UNRELEASED
Release 2.1.0-alpha - Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu)
MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
IMPROVEMENTS
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
interface to allow schedulers to maintain their own. (acmurthy)
MAPREDUCE-4146. Support limits on task status string length and number of
block locations in branch-2. (Ahmed Radwan via tomwhite)

View File

@ -192,7 +192,8 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
Resources.createResource(-1), Resources.createResource(-1),
Resources.createResource(-1));
private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
@ -383,6 +384,7 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
this.readLock.lock();
try {
ApplicationAttemptId currentApplicationAttemptId = null;
String clientToken = UNAVAILABLE;
String trackingUrl = UNAVAILABLE;
String host = UNAVAILABLE;
@ -393,19 +395,27 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
String diags = UNAVAILABLE;
if (allowAccess) {
if (this.currentAttempt != null) {
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
} else {
currentApplicationAttemptId =
BuilderUtils.newApplicationAttemptId(this.applicationId,
DUMMY_APPLICATION_ATTEMPT_NUMBER);
}
diags = this.diagnostics.toString();
} else {
appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
currentApplicationAttemptId =
BuilderUtils.newApplicationAttemptId(this.applicationId,
DUMMY_APPLICATION_ATTEMPT_NUMBER);
}
return BuilderUtils.newApplicationReport(this.applicationId,
this.currentAttempt.getAppAttemptId(), this.user, this.queue,
currentApplicationAttemptId, this.user, this.queue,
this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()), diags,
trackingUrl, this.startTime, this.finishTime, finishState,

View File

@ -56,7 +56,7 @@ public ActiveUsersManager(QueueMetrics metrics) {
* @param user application user
* @param applicationId activated application
*/
@Lock({Queue.class, SchedulerApp.class})
@Lock({Queue.class, SchedulerApplication.class})
synchronized public void activateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@ -79,7 +79,7 @@ synchronized public void activateApplication(
* @param user application user
* @param applicationId deactivated application
*/
@Lock({Queue.class, SchedulerApp.class})
@Lock({Queue.class, SchedulerApplication.class})
synchronized public void deactivateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@ -102,7 +102,7 @@ synchronized public void deactivateApplication(
* resource requests.
* @return number of active users
*/
@Lock({Queue.class, SchedulerApp.class})
@Lock({Queue.class, SchedulerApplication.class})
synchronized public int getNumActiveUsers() {
return activeUsers;
}

View File

@ -245,7 +245,8 @@ synchronized public void allocate(NodeType type, SchedulerNode node,
* @param allocatedContainers
* resources allocated to the application
*/
synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority,
synchronized private void allocateNodeLocal(
SchedulerNode node, Priority priority,
ResourceRequest nodeLocalRequest, Container container) {
// Update consumption and track allocations
allocate(container);
@ -273,7 +274,8 @@ synchronized private void allocateNodeLocal(SchedulerNode node, Priority priorit
* @param allocatedContainers
* resources allocated to the application
*/
synchronized private void allocateRackLocal(SchedulerNode node, Priority priority,
synchronized private void allocateRackLocal(
SchedulerNode node, Priority priority,
ResourceRequest rackLocalRequest, Container container) {
// Update consumption and track allocations
@ -295,7 +297,8 @@ synchronized private void allocateRackLocal(SchedulerNode node, Priority priorit
* @param allocatedContainers
* resources allocated to the application
*/
synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority,
synchronized private void allocateOffSwitch(
SchedulerNode node, Priority priority,
ResourceRequest offSwitchRequest, Container container) {
// Update consumption and track allocations

View File

@ -36,7 +36,7 @@ public class SchedulerAppReport {
private final Collection<RMContainer> reserved;
private final boolean pending;
public SchedulerAppReport(SchedulerApp app) {
public SchedulerAppReport(SchedulerApplication app) {
this.live = app.getLiveContainers();
this.reserved = app.getReservedContainers();
this.pending = app.isPending();

View File

@ -0,0 +1,43 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Represents an Application from the viewpoint of the scheduler.
* Each running Application in the RM corresponds to one instance
* of this class.
*/
@Private
@Unstable
public abstract class SchedulerApplication {
/**
* Get {@link ApplicationAttemptId} of the application master.
* @return <code>ApplicationAttemptId</code> of the application master
*/
public abstract ApplicationAttemptId getApplicationAttemptId();
/**
* Get the live containers of the application.
* @return live containers of the application
*/
public abstract Collection<RMContainer> getLiveContainers();
/**
* Get the reserved containers of the application.
* @return the reserved containers of the application
*/
public abstract Collection<RMContainer> getReservedContainers();
/**
* Is this application pending?
* @return true if it is else false.
*/
public abstract boolean isPending();
}

View File

@ -18,224 +18,45 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class SchedulerNode {
private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private volatile int numContainers;
private RMContainer reservedContainer;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
public static final String ANY = "*";
public SchedulerNode(RMNode node) {
this.rmNode = node;
this.availableResource.setMemory(node.getTotalCapability().getMemory());
}
public RMNode getRMNode() {
return this.rmNode;
}
public NodeId getNodeID() {
return this.rmNode.getNodeID();
}
public String getHttpAddress() {
return this.rmNode.getHttpAddress();
}
public String getHostName() {
return this.rmNode.getHostName();
}
public String getRackName() {
return this.rmNode.getRackName();
}
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
*/
@Private
@Unstable
public abstract class SchedulerNode {
/**
* The Scheduler has allocated containers on this node to the
* given application.
*
* @param applicationId application
* @param rmContainer allocated container
* Get hostname.
* @return hostname
*/
public synchronized void allocateContainer(ApplicationId applicationId,
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " +
getAvailableResource() + " available");
}
public synchronized Resource getAvailableResource() {
return this.availableResource;
}
public synchronized Resource getUsedResource() {
return this.usedResource;
}
private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId()))
return true;
return false;
}
private synchronized void updateResource(Container container) {
addAvailableResource(container.getResource());
--numContainers;
}
public abstract String getHostName();
/**
* Release an allocated container on this node.
* @param container container to be released
* Get rackname.
* @return rackname
*/
public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container)) {
LOG.error("Invalid container released " + container);
return;
}
public abstract String getRackName();
/**
* Get used resources on the node.
* @return used resources on the node
*/
public abstract Resource getUsedResource();
/* remove the containers from the nodemanger */
launchedContainers.remove(container.getId());
updateResource(container);
/**
* Get available resources on the node.
* @return available resources on the node
*/
public abstract Resource getAvailableResource();
LOG.info("Released container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
}
private synchronized void addAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.addTo(availableResource, resource);
Resources.subtractFrom(usedResource, resource);
}
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.subtractFrom(availableResource, resource);
Resources.addTo(usedResource, resource);
}
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
" available=" + getAvailableResource().getMemory() +
" used=" + getUsedResource().getMemory();
}
public int getNumContainers() {
return numContainers;
}
public synchronized List<RMContainer> getRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
}
public synchronized void reserveResource(
SchedulerApp application, Priority priority,
RMContainer reservedContainer) {
// Check if it's already reserved
if (this.reservedContainer != null) {
// Sanity check
if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer +
" on node " + reservedContainer.getReservedNode() +
" when currently" + " reserved resource " + this.reservedContainer +
" on node " + this.reservedContainer.getReservedNode());
}
// Cannot reserve more than one application on a given node!
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
reservedContainer.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer +
" for application " + application.getApplicationId() +
" when currently" +
" reserved container " + this.reservedContainer +
" on node " + this);
}
LOG.info("Updated reserved container " +
reservedContainer.getContainer().getId() + " on node " +
this + " for application " + application);
} else {
LOG.info("Reserved container " + reservedContainer.getContainer().getId() +
" on node " + this + " for application " + application);
}
this.reservedContainer = reservedContainer;
}
public synchronized void unreserveResource(SchedulerApp application) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
reservedContainer.getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
" for application " + application.getApplicationId() +
" when currently reserved " +
" for application " + reservedApplication.getApplicationId() +
" on node " + this);
}
reservedContainer = null;
}
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
}
/**
* Get number of active containers on the node.
* @return number of active containers on the node
*/
public abstract int getNumContainers();
}

View File

@ -33,8 +33,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
/**
* <code>CSQueue</code> represents a node in the tree of
@ -150,7 +150,7 @@ public interface CSQueue
* @param user user who submitted the application
* @param queue queue to which the application is submitted
*/
public void submitApplication(SchedulerApp application, String user,
public void submitApplication(FiCaSchedulerApp application, String user,
String queue)
throws AccessControlException;
@ -159,7 +159,7 @@ public void submitApplication(SchedulerApp application, String user,
* @param application
* @param queue application queue
*/
public void finishApplication(SchedulerApp application, String queue);
public void finishApplication(FiCaSchedulerApp application, String queue);
/**
* Assign containers to applications in the queue or it's children (if any).
@ -168,7 +168,7 @@ public void submitApplication(SchedulerApp application, String user,
* @return the assignment
*/
public CSAssignment assignContainers(
Resource clusterResource, SchedulerNode node);
Resource clusterResource, FiCaSchedulerNode node);
/**
* A container assigned to the queue has completed.
@ -182,7 +182,7 @@ public CSAssignment assignContainers(
* @param event event to be sent to the container
*/
public void completedContainer(Resource clusterResource,
SchedulerApp application, SchedulerNode node,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer container, ContainerStatus containerStatus,
RMContainerEventType event);
@ -219,6 +219,6 @@ public void reinitialize(CSQueue queue, Resource clusterResource)
* @param application the application for which the container was allocated
* @param container the container that was recovered.
*/
public void recoverContainer(Resource clusterResource, SchedulerApp application,
public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
Container container);
}

View File

@ -63,11 +63,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@ -103,10 +103,10 @@ public int compare(CSQueue q1, CSQueue q2) {
}
};
static final Comparator<SchedulerApp> applicationComparator =
new Comparator<SchedulerApp>() {
static final Comparator<FiCaSchedulerApp> applicationComparator =
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(SchedulerApp a1, SchedulerApp a2) {
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationId().getId() - a2.getApplicationId().getId();
}
};
@ -131,8 +131,8 @@ public Configuration getConf() {
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
private Map<NodeId, SchedulerNode> nodes =
new ConcurrentHashMap<NodeId, SchedulerNode>();
private Map<NodeId, FiCaSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private Resource clusterResource =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
@ -141,8 +141,8 @@ public Configuration getConf() {
private Resource minimumAllocation;
private Resource maximumAllocation;
private Map<ApplicationAttemptId, SchedulerApp> applications =
new ConcurrentHashMap<ApplicationAttemptId, SchedulerApp>();
private Map<ApplicationAttemptId, FiCaSchedulerApp> applications =
new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private boolean initialized = false;
@ -299,7 +299,7 @@ static CSQueue parseQueue(
CSQueue parent, String queueName, Map<String, CSQueue> queues,
Map<String, CSQueue> oldQueues,
Comparator<CSQueue> queueComparator,
Comparator<SchedulerApp> applicationComparator,
Comparator<FiCaSchedulerApp> applicationComparator,
QueueHook hook) throws IOException {
CSQueue queue;
String[] childQueueNames =
@ -370,8 +370,8 @@ synchronized CSQueue getQueue(String queueName) {
}
// TODO: Fix store
SchedulerApp SchedulerApp =
new SchedulerApp(applicationAttemptId, user, queue,
FiCaSchedulerApp SchedulerApp =
new FiCaSchedulerApp(applicationAttemptId, user, queue,
queue.getActiveUsersManager(), rmContext, null);
// Submit to the queue
@ -404,7 +404,7 @@ private synchronized void doneApplication(
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
// throw new IOException("Unknown application " + applicationId +
@ -456,7 +456,7 @@ private synchronized void doneApplication(
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@ -551,7 +551,7 @@ private synchronized void nodeUpdate(RMNode nm,
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
SchedulerNode node = getNode(nm.getNodeID());
FiCaSchedulerNode node = getNode(nm.getNodeID());
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@ -578,7 +578,7 @@ private synchronized void nodeUpdate(RMNode nm,
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
SchedulerApp reservedApplication =
FiCaSchedulerApp reservedApplication =
getApplication(reservedContainer.getApplicationAttemptId());
// Try to fulfill the reservation
@ -601,10 +601,10 @@ private synchronized void nodeUpdate(RMNode nm,
}
private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@ -672,7 +672,7 @@ public void handle(SchedulerEvent event) {
}
private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
++numNodeManagers;
@ -681,7 +681,7 @@ private synchronized void addNode(RMNode nodeManager) {
}
private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
}
@ -726,7 +726,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
@ -735,7 +735,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
}
// Get the node on which the container was allocated
SchedulerNode node = getNode(container.getNodeId());
FiCaSchedulerNode node = getNode(container.getNodeId());
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
@ -749,24 +749,24 @@ private synchronized void completedContainer(RMContainer rmContainer,
}
@Lock(Lock.NoLock.class)
SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
return applications.get(applicationAttemptId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
SchedulerApp app = getApplication(applicationAttemptId);
FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Lock(Lock.NoLock.class)
SchedulerNode getNode(NodeId nodeId) {
FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
private RMContainer getRMContainer(ContainerId containerId) {
SchedulerApp application =
FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@ -790,7 +790,7 @@ public void recover(RMState state) throws Exception {
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
SchedulerNode node = getNode(nodeId);
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}

View File

@ -61,9 +61,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -94,11 +94,11 @@ public class LeafQueue implements CSQueue {
private float usedCapacity = 0.0f;
private volatile int numContainers;
Set<SchedulerApp> activeApplications;
Map<ApplicationAttemptId, SchedulerApp> applicationsMap =
new HashMap<ApplicationAttemptId, SchedulerApp>();
Set<FiCaSchedulerApp> activeApplications;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
Set<SchedulerApp> pendingApplications;
Set<FiCaSchedulerApp> pendingApplications;
private final Resource minimumAllocation;
private final Resource maximumAllocation;
@ -126,7 +126,7 @@ public class LeafQueue implements CSQueue {
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent,
Comparator<SchedulerApp> applicationComparator, CSQueue old) {
Comparator<FiCaSchedulerApp> applicationComparator, CSQueue old) {
this.scheduler = cs;
this.queueName = queueName;
this.parent = parent;
@ -199,8 +199,8 @@ public LeafQueue(CapacitySchedulerContext cs,
}
this.pendingApplications =
new TreeSet<SchedulerApp>(applicationComparator);
this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
new TreeSet<FiCaSchedulerApp>(applicationComparator);
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
}
private synchronized void setupQueueConfigs(
@ -580,7 +580,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
}
@Override
public void submitApplication(SchedulerApp application, String userName,
public void submitApplication(FiCaSchedulerApp application, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
@ -644,9 +644,9 @@ public void submitApplication(SchedulerApp application, String userName,
}
private synchronized void activateApplications() {
for (Iterator<SchedulerApp> i=pendingApplications.iterator();
for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator();
i.hasNext(); ) {
SchedulerApp application = i.next();
FiCaSchedulerApp application = i.next();
// Check queue limit
if (getNumActiveApplications() >= getMaximumActiveApplications()) {
@ -666,7 +666,7 @@ private synchronized void activateApplications() {
}
}
private synchronized void addApplication(SchedulerApp application, User user) {
private synchronized void addApplication(FiCaSchedulerApp application, User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
@ -686,7 +686,7 @@ private synchronized void addApplication(SchedulerApp application, User user) {
}
@Override
public void finishApplication(SchedulerApp application, String queue) {
public void finishApplication(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
removeApplication(application, getUser(application.getUser()));
@ -696,7 +696,7 @@ public void finishApplication(SchedulerApp application, String queue) {
parent.finishApplication(application, queue);
}
public synchronized void removeApplication(SchedulerApp application, User user) {
public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
@ -728,7 +728,7 @@ public synchronized void removeApplication(SchedulerApp application, User user)
);
}
private synchronized SchedulerApp getApplication(
private synchronized FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applicationsMap.get(applicationAttemptId);
}
@ -738,7 +738,7 @@ private synchronized SchedulerApp getApplication(
@Override
public synchronized CSAssignment
assignContainers(Resource clusterResource, SchedulerNode node) {
assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getHostName()
@ -748,7 +748,7 @@ private synchronized SchedulerApp getApplication(
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
SchedulerApp application =
FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
return new CSAssignment(
assignReservedContainer(application, node, reservedContainer,
@ -758,7 +758,7 @@ private synchronized SchedulerApp getApplication(
}
// Try to assign containers to applications in order
for (SchedulerApp application : activeApplications) {
for (FiCaSchedulerApp application : activeApplications) {
if(LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application "
@ -836,8 +836,8 @@ private synchronized SchedulerApp getApplication(
}
private synchronized Resource assignReservedContainer(SchedulerApp application,
SchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
private synchronized Resource assignReservedContainer(FiCaSchedulerApp application,
FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
// Do we still need this reservation?
Priority priority = rmContainer.getReservedPriority();
if (application.getTotalRequiredResources(priority) == 0) {
@ -880,9 +880,9 @@ private synchronized boolean assignToQueue(Resource clusterResource,
return true;
}
@Lock({LeafQueue.class, SchedulerApp.class})
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
private Resource computeUserLimitAndSetHeadroom(
SchedulerApp application, Resource clusterResource, Resource required) {
FiCaSchedulerApp application, Resource clusterResource, Resource required) {
String user = application.getUser();
@ -919,7 +919,7 @@ private Resource computeUserLimitAndSetHeadroom(
}
@Lock(NoLock.class)
private Resource computeUserLimit(SchedulerApp application,
private Resource computeUserLimit(FiCaSchedulerApp application,
Resource clusterResource, Resource required) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
@ -1007,7 +1007,7 @@ static int divideAndCeil(int a, int b) {
return (a + (b - 1)) / b;
}
boolean needContainers(SchedulerApp application, Priority priority, Resource required) {
boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@ -1036,7 +1036,7 @@ boolean needContainers(SchedulerApp application, Priority priority, Resource req
}
private CSAssignment assignContainersOnNode(Resource clusterResource,
SchedulerNode node, SchedulerApp application,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
Resource assigned = Resources.none();
@ -1065,7 +1065,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
}
private Resource assignNodeLocalContainers(Resource clusterResource,
SchedulerNode node, SchedulerApp application,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, node.getHostName());
@ -1081,7 +1081,7 @@ private Resource assignNodeLocalContainers(Resource clusterResource,
}
private Resource assignRackLocalContainers(Resource clusterResource,
SchedulerNode node, SchedulerApp application, Priority priority,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, node.getRackName());
@ -1095,8 +1095,8 @@ private Resource assignRackLocalContainers(Resource clusterResource,
return Resources.none();
}
private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node,
SchedulerApp application, Priority priority,
private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, RMNode.ANY);
@ -1111,8 +1111,8 @@ private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNo
return Resources.none();
}
boolean canAssign(SchedulerApp application, Priority priority,
SchedulerNode node, NodeType type, RMContainer reservedContainer) {
boolean canAssign(FiCaSchedulerApp application, Priority priority,
FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
// Clearly we need containers for this application...
if (type == NodeType.OFF_SWITCH) {
@ -1159,14 +1159,14 @@ boolean canAssign(SchedulerApp application, Priority priority,
}
private Container getContainer(RMContainer rmContainer,
SchedulerApp application, SchedulerNode node,
FiCaSchedulerApp application, FiCaSchedulerNode node,
Resource capability, Priority priority) {
return (rmContainer != null) ? rmContainer.getContainer() :
createContainer(application, node, capability, priority);
}
public Container createContainer(SchedulerApp application, SchedulerNode node,
public Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
@ -1192,8 +1192,8 @@ public Container createContainer(SchedulerApp application, SchedulerNode node,
return container;
}
private Resource assignContainer(Resource clusterResource, SchedulerNode node,
SchedulerApp application, Priority priority,
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getHostName()
@ -1267,8 +1267,8 @@ private Resource assignContainer(Resource clusterResource, SchedulerNode node,
}
}
private void reserve(SchedulerApp application, Priority priority,
SchedulerNode node, RMContainer rmContainer, Container container) {
private void reserve(FiCaSchedulerApp application, Priority priority,
FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
// Update reserved metrics if this is the first reservation
if (rmContainer == null) {
getMetrics().reserveResource(
@ -1282,8 +1282,8 @@ private void reserve(SchedulerApp application, Priority priority,
node.reserveResource(application, priority, rmContainer);
}
private void unreserve(SchedulerApp application, Priority priority,
SchedulerNode node, RMContainer rmContainer) {
private void unreserve(FiCaSchedulerApp application, Priority priority,
FiCaSchedulerNode node, RMContainer rmContainer) {
// Done with the reservation?
application.unreserve(node, priority);
node.unreserveResource(application);
@ -1296,7 +1296,7 @@ private void unreserve(SchedulerApp application, Priority priority,
@Override
public void completedContainer(Resource clusterResource,
SchedulerApp application, SchedulerNode node, RMContainer rmContainer,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
@ -1338,7 +1338,7 @@ public void completedContainer(Resource clusterResource,
}
synchronized void allocateResource(Resource clusterResource,
SchedulerApp application, Resource resource) {
FiCaSchedulerApp application, Resource resource) {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@ -1363,7 +1363,7 @@ synchronized void allocateResource(Resource clusterResource,
}
synchronized void releaseResource(Resource clusterResource,
SchedulerApp application, Resource resource) {
FiCaSchedulerApp application, Resource resource) {
// Update queue metrics
Resources.subtractFrom(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@ -1401,7 +1401,7 @@ public synchronized void updateClusterResource(Resource clusterResource) {
this, parent, clusterResource, minimumAllocation);
// Update application properties
for (SchedulerApp application : activeApplications) {
for (FiCaSchedulerApp application : activeApplications) {
synchronized (application) {
computeUserLimitAndSetHeadroom(application, clusterResource,
Resources.none());
@ -1464,7 +1464,7 @@ public synchronized void releaseContainer(Resource resource) {
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApp application, Container container) {
FiCaSchedulerApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, application, container.getResource());

View File

@ -51,8 +51,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@Private
@Evolving
@ -421,7 +421,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
}
@Override
public void submitApplication(SchedulerApp application, String user,
public void submitApplication(FiCaSchedulerApp application, String user,
String queue) throws AccessControlException {
synchronized (this) {
@ -453,7 +453,7 @@ public void submitApplication(SchedulerApp application, String user,
}
}
private synchronized void addApplication(SchedulerApp application,
private synchronized void addApplication(FiCaSchedulerApp application,
String user) {
++numApplications;
@ -466,7 +466,7 @@ private synchronized void addApplication(SchedulerApp application,
}
@Override
public void finishApplication(SchedulerApp application, String queue) {
public void finishApplication(FiCaSchedulerApp application, String queue) {
synchronized (this) {
removeApplication(application, application.getUser());
@ -478,7 +478,7 @@ public void finishApplication(SchedulerApp application, String queue) {
}
}
public synchronized void removeApplication(SchedulerApp application,
public synchronized void removeApplication(FiCaSchedulerApp application,
String user) {
--numApplications;
@ -516,7 +516,7 @@ synchronized void setMaxCapacity(float maximumCapacity) {
@Override
public synchronized CSAssignment assignContainers(
Resource clusterResource, SchedulerNode node) {
Resource clusterResource, FiCaSchedulerNode node) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
@ -594,14 +594,14 @@ private synchronized boolean assignToQueue(Resource clusterResource) {
}
private boolean canAssign(SchedulerNode node) {
private boolean canAssign(FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation);
}
synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
SchedulerNode node) {
FiCaSchedulerNode node) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
@ -654,7 +654,7 @@ void printChildQueues() {
@Override
public void completedContainer(Resource clusterResource,
SchedulerApp application, SchedulerNode node,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
@ -715,7 +715,7 @@ public QueueMetrics getMetrics() {
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApp application, Container container) {
FiCaSchedulerApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, container.getResource());

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.ArrayList;
import java.util.Collection;
@ -28,6 +28,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -53,6 +54,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
@ -63,9 +69,11 @@
* of this class.
*/
@SuppressWarnings("unchecked")
public class SchedulerApp {
@Private
@Unstable
public class FiCaSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
@ -101,7 +109,7 @@ public class SchedulerApp {
.newRecordInstance(Resource.class);
private final RMContext rmContext;
public SchedulerApp(ApplicationAttemptId applicationAttemptId,
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, ApplicationStore store) {
this.rmContext = rmContext;
@ -115,6 +123,7 @@ public ApplicationId getApplicationId() {
return this.appSchedulingInfo.getApplicationId();
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return this.appSchedulingInfo.getApplicationAttemptId();
}
@ -156,6 +165,7 @@ public Resource getResource(Priority priority) {
* Is this application pending?
* @return true if it is else false.
*/
@Override
public boolean isPending() {
return this.appSchedulingInfo.isPending();
}
@ -168,6 +178,7 @@ public String getQueueName() {
* Get the list of live containers
* @return All of the live containers
*/
@Override
public synchronized Collection<RMContainer> getLiveContainers() {
return new ArrayList<RMContainer>(liveContainers.values());
}
@ -222,7 +233,7 @@ synchronized public void containerCompleted(RMContainer rmContainer,
Resources.subtractFrom(currentConsumption, containerResource);
}
synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
@ -347,7 +358,7 @@ public synchronized Resource getCurrentReservation() {
return currentReservation;
}
public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority,
RMContainer rmContainer, Container container) {
// Create RMContainer if necessary
if (rmContainer == null) {
@ -384,7 +395,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
return rmContainer;
}
public synchronized void unreserve(SchedulerNode node, Priority priority) {
public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
@ -410,7 +421,7 @@ public synchronized void unreserve(SchedulerNode node, Priority priority) {
* @param priority priority of reserved container
* @return true is reserved, false if not
*/
public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers != null) {
@ -434,6 +445,7 @@ public synchronized float getLocalityWaitFactor(
* Get the list of reserved containers
* @return All of the reserved containers.
*/
@Override
public synchronized List<RMContainer> getReservedContainers() {
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :

View File

@ -0,0 +1,249 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private volatile int numContainers;
private RMContainer reservedContainer;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
public static final String ANY = "*";
public FiCaSchedulerNode(RMNode node) {
this.rmNode = node;
this.availableResource.setMemory(node.getTotalCapability().getMemory());
}
public RMNode getRMNode() {
return this.rmNode;
}
public NodeId getNodeID() {
return this.rmNode.getNodeID();
}
public String getHttpAddress() {
return this.rmNode.getHttpAddress();
}
@Override
public String getHostName() {
return this.rmNode.getHostName();
}
@Override
public String getRackName() {
return this.rmNode.getRackName();
}
/**
* The Scheduler has allocated containers on this node to the
* given application.
*
* @param applicationId application
* @param rmContainer allocated container
*/
public synchronized void allocateContainer(ApplicationId applicationId,
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " +
getAvailableResource() + " available");
}
@Override
public synchronized Resource getAvailableResource() {
return this.availableResource;
}
@Override
public synchronized Resource getUsedResource() {
return this.usedResource;
}
private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId()))
return true;
return false;
}
private synchronized void updateResource(Container container) {
addAvailableResource(container.getResource());
--numContainers;
}
/**
* Release an allocated container on this node.
* @param container container to be released
*/
public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container)) {
LOG.error("Invalid container released " + container);
return;
}
/* remove the containers from the nodemanger */
launchedContainers.remove(container.getId());
updateResource(container);
LOG.info("Released container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
}
private synchronized void addAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.addTo(availableResource, resource);
Resources.subtractFrom(usedResource, resource);
}
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.subtractFrom(availableResource, resource);
Resources.addTo(usedResource, resource);
}
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
" available=" + getAvailableResource().getMemory() +
" used=" + getUsedResource().getMemory();
}
@Override
public int getNumContainers() {
return numContainers;
}
public synchronized List<RMContainer> getRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
}
public synchronized void reserveResource(
SchedulerApplication application, Priority priority,
RMContainer reservedContainer) {
// Check if it's already reserved
if (this.reservedContainer != null) {
// Sanity check
if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer +
" on node " + reservedContainer.getReservedNode() +
" when currently" + " reserved resource " + this.reservedContainer +
" on node " + this.reservedContainer.getReservedNode());
}
// Cannot reserve more than one application on a given node!
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
reservedContainer.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer +
" for application " + application.getApplicationAttemptId() +
" when currently" +
" reserved container " + this.reservedContainer +
" on node " + this);
}
LOG.info("Updated reserved container " +
reservedContainer.getContainer().getId() + " on node " +
this + " for application " + application);
} else {
LOG.info("Reserved container " + reservedContainer.getContainer().getId() +
" on node " + this + " for application " + application);
}
this.reservedContainer = reservedContainer;
}
public synchronized void unreserveResource(
SchedulerApplication application) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
reservedContainer.getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
" for application " + application.getApplicationAttemptId() +
" when currently reserved " +
" for application " + reservedApplication.getApplicationId() +
" on node " + this);
}
reservedContainer = null;
}
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
}
}

View File

@ -37,8 +37,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -69,7 +67,7 @@ public String getName() {
return app.getApplicationId().toString();
}
public SchedulerApp getApp() {
public FSSchedulerApp getApp() {
return app;
}
@ -150,7 +148,8 @@ public void setRunnable(boolean runnable) {
* given appliction on the given node with the given capability and
* priority.
*/
public Container createContainer(SchedulerApp application, SchedulerNode node,
public Container createContainer(
FSSchedulerApp application, FSSchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
@ -180,10 +179,10 @@ public Container createContainer(SchedulerApp application, SchedulerNode node,
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply
* update relevant bookeeping. This dispatches ro relevant handlers
* in the {@link SchedulerNode} and {@link SchedulerApp} classes.
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
*/
private void reserve(SchedulerApp application, Priority priority,
SchedulerNode node, Container container, boolean alreadyReserved) {
private void reserve(FSSchedulerApp application, Priority priority,
FSSchedulerNode node, Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getHostName() +
" app_id=" + app.getApplicationId());
if (!alreadyReserved) {
@ -209,8 +208,8 @@ private void reserve(SchedulerApp application, Priority priority,
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
* handlers for an unreservation.
*/
private void unreserve(SchedulerApp application, Priority priority,
SchedulerNode node) {
private void unreserve(FSSchedulerApp application, Priority priority,
FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
application.unreserve(node, priority);
node.unreserveResource(application);
@ -225,8 +224,8 @@ private void unreserve(SchedulerApp application, Priority priority,
* not have enough memory, create a reservation. This is called once we are
* sure the particular request should be facilitated by this node.
*/
private Resource assignContainer(SchedulerNode node,
SchedulerApp application, Priority priority,
private Resource assignContainer(FSSchedulerNode node,
FSSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, boolean reserved) {
// How much does this request need?
@ -282,7 +281,7 @@ private Resource assignContainer(SchedulerNode node,
@Override
public Resource assignContainer(SchedulerNode node, boolean reserved) {
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
if (reserved) {

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
/**
* A queue containing several applications.
@ -35,7 +34,8 @@ public class FSQueue {
private String name;
/** Applications in this specific queue; does not include children queues' jobs. */
private Collection<SchedulerApp> applications = new ArrayList<SchedulerApp>();
private Collection<FSSchedulerApp> applications =
new ArrayList<FSSchedulerApp>();
/** Scheduling mode for jobs inside the queue (fair or FIFO) */
private SchedulingMode schedulingMode;
@ -50,7 +50,7 @@ public FSQueue(FairScheduler scheduler, String name) {
this.scheduler = scheduler;
}
public Collection<SchedulerApp> getApplications() {
public Collection<FSSchedulerApp> getApplications() {
return applications;
}
@ -59,7 +59,7 @@ public void addApp(FSSchedulerApp app) {
queueSchedulable.addApp(new AppSchedulable(scheduler, app, this));
}
public void removeJob(SchedulerApp app) {
public void removeJob(FSSchedulerApp app) {
applications.remove(app);
queueSchedulable.removeApp(app);
}

View File

@ -45,8 +45,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@Private
@Unstable
@ -80,7 +78,7 @@ public void addApp(AppSchedulable app) {
appScheds.add(app);
}
public void removeApp(SchedulerApp app) {
public void removeApp(FSSchedulerApp app) {
for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
AppSchedulable appSched = it.next();
if (appSched.getApp() == app) {
@ -146,7 +144,7 @@ public long getStartTime() {
}
@Override
public Resource assignContainer(SchedulerNode node, boolean reserved) {
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
LOG.debug("Node offered to queue: " + this.getName() + " reserved: " + reserved);
// If this queue is over its limit, reject
if (Resources.greaterThan(this.getResourceUsage(),

View File

@ -18,31 +18,410 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
/**
* This class extends the application lifecycle management contained with
* the {@link SchedulerApp} class and adds delay-scheduling information
* specific to the FairScheduler.
*/
public class FSSchedulerApp extends SchedulerApp {
private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
public class FSSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private final AppSchedulingInfo appSchedulingInfo;
private final Queue queue;
private final Resource currentConsumption = recordFactory
.newRecordInstance(Resource.class);
private Resource resourceLimit = recordFactory
.newRecordInstance(Resource.class);
private Map<ContainerId, RMContainer> liveContainers
= new HashMap<ContainerId, RMContainer>();
private List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
* asks the application for a task at this priority, it is incremented,
* and each time the application successfully schedules a task, it
* is reset to 0.
*/
Multiset<Priority> schedulingOpportunities = HashMultiset.create();
Multiset<Priority> reReservations = HashMultiset.create();
Resource currentReservation = recordFactory
.newRecordInstance(Resource.class);
private final RMContext rmContext;
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, ApplicationStore store) {
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, store);
this.queue = queue;
}
public ApplicationId getApplicationId() {
return this.appSchedulingInfo.getApplicationId();
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return this.appSchedulingInfo.getApplicationAttemptId();
}
public String getUser() {
return this.appSchedulingInfo.getUser();
}
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
this.appSchedulingInfo.updateResourceRequests(requests);
}
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
return this.appSchedulingInfo.getResourceRequests(priority);
}
public int getNewContainerId() {
return this.appSchedulingInfo.getNewContainerId();
}
public Collection<Priority> getPriorities() {
return this.appSchedulingInfo.getPriorities();
}
public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress);
}
public synchronized int getTotalRequiredResources(Priority priority) {
return getResourceRequest(priority, RMNode.ANY).getNumContainers();
}
public Resource getResource(Priority priority) {
return this.appSchedulingInfo.getResource(priority);
}
/**
* Is this application pending?
* @return true if it is else false.
*/
@Override
public boolean isPending() {
return this.appSchedulingInfo.isPending();
}
public String getQueueName() {
return this.appSchedulingInfo.getQueueName();
}
/**
* Get the list of live containers
* @return All of the live containers
*/
@Override
public synchronized Collection<RMContainer> getLiveContainers() {
return new ArrayList<RMContainer>(liveContainers.values());
}
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information
this.appSchedulingInfo.stop(rmAppAttemptFinalState);
}
@SuppressWarnings("unchecked")
public synchronized void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
// Inform the container
RMContainer rmContainer =
getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
}
synchronized public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(
containerId,
containerStatus,
event)
);
LOG.info("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId);
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
}
synchronized public List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
for (RMContainer rmContainer : newlyAllocatedContainers) {
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
returnContainerList.add(rmContainer.getContainer());
}
newlyAllocatedContainers.clear();
return returnContainerList;
}
public Resource getCurrentConsumption() {
return this.currentConsumption;
}
synchronized public void showRequests() {
if (LOG.isDebugEnabled()) {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId() +
" headRoom=" + getHeadroom() +
" currentConsumption=" + currentConsumption.getMemory());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
}
}
}
}
}
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
synchronized public void addSchedulingOpportunity(Priority priority) {
this.schedulingOpportunities.setCount(priority,
schedulingOpportunities.count(priority) + 1);
}
/**
* Return the number of times the application has been given an opportunity
* to schedule a task at the given priority since the last time it
* successfully did so.
*/
synchronized public int getSchedulingOpportunities(Priority priority) {
return this.schedulingOpportunities.count(priority);
}
synchronized void resetReReservations(Priority priority) {
this.reReservations.setCount(priority, 0);
}
synchronized void addReReservation(Priority priority) {
this.reReservations.add(priority);
}
synchronized public int getReReservations(Priority priority) {
return this.reReservations.count(priority);
}
public synchronized int getNumReservedContainers(Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
return (reservedContainers == null) ? 0 : reservedContainers.size();
}
/**
* Get total current reservations.
* Used only by unit tests
* @return total current reservations
*/
@Stable
@Private
public synchronized Resource getCurrentReservation() {
return currentReservation;
}
public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority,
RMContainer rmContainer, Container container) {
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
rmContext.getContainerAllocationExpirer());
Resources.addTo(currentReservation, container.getResource());
// Reset the re-reservation count
resetReReservations(priority);
} else {
// Note down the re-reservation
addReReservation(priority);
}
rmContainer.handle(new RMContainerReservedEvent(container.getId(),
container.getResource(), node.getNodeID(), priority));
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers == null) {
reservedContainers = new HashMap<NodeId, RMContainer>();
this.reservedContainers.put(priority, reservedContainers);
}
reservedContainers.put(node.getNodeID(), rmContainer);
LOG.info("Application " + getApplicationId()
+ " reserved container " + rmContainer
+ " on node " + node + ", currently has " + reservedContainers.size()
+ " at priority " + priority
+ "; currentReservation " + currentReservation.getMemory());
return rmContainer;
}
public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(priority);
}
// Reset the re-reservation count
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size() + " at priority "
+ priority + "; currentReservation " + currentReservation);
}
/**
* Has the application reserved the given <code>node</code> at the
* given <code>priority</code>?
* @param node node to be checked
* @param priority priority of reserved container
* @return true is reserved, false if not
*/
public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers != null) {
return reservedContainers.containsKey(node.getNodeID());
}
return false;
}
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(priority).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
/**
* Get the list of reserved containers
* @return All of the reserved containers.
*/
@Override
public synchronized List<RMContainer> getReservedContainers() {
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
this.reservedContainers.entrySet()) {
reservedContainers.addAll(e.getValue().values());
}
return reservedContainers;
}
public synchronized void setHeadroom(Resource globalLimit) {
this.resourceLimit = globalLimit;
}
/**
* Get available headroom in terms of resources for the application's user.
* @return available resource headroom
*/
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemory() < 0) {
resourceLimit.setMemory(0);
}
return resourceLimit;
}
public Queue getQueue() {
return queue;
}
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
@ -62,13 +441,6 @@ public class FSSchedulerApp extends SchedulerApp {
// Time of the last container scheduled at the current allowed level
Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>();
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, ApplicationStore store) {
super(applicationAttemptId, user, queue, activeUsersManager,
rmContext, store);
}
/**
* Should be called when an application has successfully scheduled a container,
* or when the scheduling locality threshold is relaxed.
@ -78,7 +450,7 @@ public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
*/
synchronized public void resetSchedulingOpportunities(Priority priority) {
this.lastScheduledContainer.put(priority, System.currentTimeMillis());
super.resetSchedulingOpportunities(priority);
this.schedulingOpportunities.setCount(priority, 0);
}
/**
@ -127,7 +499,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL)) {
}
synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
// Update allowed locality level
@ -143,7 +515,42 @@ else if (allowed.equals(NodeType.RACK_LOCAL) &&
this.resetAllowedLocalityLevel(priority, type);
}
}
return super.allocate(type, node, priority, request, container);
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(priority) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this
.getApplicationAttemptId(), node.getNodeID(), this.rmContext
.getDispatcher().getEventHandler(), this.rmContext
.getContainerAllocationExpirer());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
appSchedulingInfo.allocate(type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId());
return rmContainer;
}
/**

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
public class FSSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private volatile int numContainers;
private RMContainer reservedContainer;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
public static final String ANY = "*";
public FSSchedulerNode(RMNode node) {
this.rmNode = node;
this.availableResource.setMemory(node.getTotalCapability().getMemory());
}
public RMNode getRMNode() {
return this.rmNode;
}
public NodeId getNodeID() {
return this.rmNode.getNodeID();
}
public String getHttpAddress() {
return this.rmNode.getHttpAddress();
}
@Override
public String getHostName() {
return this.rmNode.getHostName();
}
@Override
public String getRackName() {
return this.rmNode.getRackName();
}
/**
* The Scheduler has allocated containers on this node to the
* given application.
*
* @param applicationId application
* @param rmContainer allocated container
*/
public synchronized void allocateContainer(ApplicationId applicationId,
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " +
getAvailableResource() + " available");
}
@Override
public synchronized Resource getAvailableResource() {
return this.availableResource;
}
@Override
public synchronized Resource getUsedResource() {
return this.usedResource;
}
private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId()))
return true;
return false;
}
private synchronized void updateResource(Container container) {
addAvailableResource(container.getResource());
--numContainers;
}
/**
* Release an allocated container on this node.
* @param container container to be released
*/
public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container)) {
LOG.error("Invalid container released " + container);
return;
}
/* remove the containers from the nodemanger */
launchedContainers.remove(container.getId());
updateResource(container);
LOG.info("Released container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
}
private synchronized void addAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.addTo(availableResource, resource);
Resources.subtractFrom(usedResource, resource);
}
private synchronized void deductAvailableResource(Resource resource) {
if (resource == null) {
LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress());
return;
}
Resources.subtractFrom(availableResource, resource);
Resources.addTo(usedResource, resource);
}
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
" available=" + getAvailableResource().getMemory() +
" used=" + getUsedResource().getMemory();
}
@Override
public int getNumContainers() {
return numContainers;
}
public synchronized List<RMContainer> getRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
}
public synchronized void reserveResource(
FSSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
// Check if it's already reserved
if (this.reservedContainer != null) {
// Sanity check
if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer +
" on node " + reservedContainer.getReservedNode() +
" when currently" + " reserved resource " + this.reservedContainer +
" on node " + this.reservedContainer.getReservedNode());
}
// Cannot reserve more than one application on a given node!
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
reservedContainer.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
" container " + reservedContainer +
" for application " + application.getApplicationId() +
" when currently" +
" reserved container " + this.reservedContainer +
" on node " + this);
}
LOG.info("Updated reserved container " +
reservedContainer.getContainer().getId() + " on node " +
this + " for application " + application);
} else {
LOG.info("Reserved container " + reservedContainer.getContainer().getId() +
" on node " + this + " for application " + application);
}
this.reservedContainer = reservedContainer;
}
public synchronized void unreserveResource(
FSSchedulerApp application) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
reservedContainer.getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
" for application " + application.getApplicationId() +
" when currently reserved " +
" for application " + reservedApplication.getApplicationId() +
" on node " + this);
}
reservedContainer = null;
}
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
}
}

View File

@ -63,9 +63,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -116,12 +114,12 @@ public class FairScheduler implements ResourceScheduler {
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
protected Map<ApplicationAttemptId, SchedulerApp> applications
= new HashMap<ApplicationAttemptId, SchedulerApp>();
protected Map<ApplicationAttemptId, FSSchedulerApp> applications
= new HashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, SchedulerNode> nodes =
new ConcurrentHashMap<NodeId, SchedulerNode>();
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster
private Resource clusterCapacity =
@ -158,7 +156,7 @@ public List<FSQueueSchedulable> getQueueSchedulables() {
}
private RMContainer getRMContainer(ContainerId containerId) {
SchedulerApp application =
FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@ -294,7 +292,8 @@ protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPree
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none()))
return;
Map<RMContainer, SchedulerApp> apps = new HashMap<RMContainer, SchedulerApp>();
Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>();
Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
// Collect running containers from over-scheduled queues
@ -526,7 +525,7 @@ private synchronized void removeApplication(
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
SchedulerApp application = applications.get(applicationAttemptId);
FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@ -576,7 +575,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
SchedulerApp application = applications.get(applicationAttemptId);
FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
@ -585,7 +584,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
}
// Get the node on which the container was allocated
SchedulerNode node = nodes.get(container.getNodeId());
FSSchedulerNode node = nodes.get(container.getNodeId());
if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(node, rmContainer.getReservedPriority());
@ -602,7 +601,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
}
private synchronized void addNode(RMNode node) {
this.nodes.put(node.getNodeID(), new SchedulerNode(node));
this.nodes.put(node.getNodeID(), new FSSchedulerNode(node));
Resources.addTo(clusterCapacity, node.getTotalCapability());
LOG.info("Added node " + node.getNodeAddress() +
@ -610,7 +609,7 @@ private synchronized void addNode(RMNode node) {
}
private synchronized void removeNode(RMNode rmNode) {
SchedulerNode node = this.nodes.get(rmNode.getNodeID());
FSSchedulerNode node = this.nodes.get(rmNode.getNodeID());
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
// Remove running containers
@ -643,7 +642,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
// Make sure this application exists
SchedulerApp application = applications.get(appAttemptId);
FSSchedulerApp application = applications.get(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@ -704,10 +703,10 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
* Process a container which has launched on a node, as reported by the
* node.
*/
private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
SchedulerApp application = applications.get(applicationAttemptId);
FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@ -726,7 +725,7 @@ private synchronized void nodeUpdate(RMNode nm,
List<ContainerStatus> completedContainers) {
LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
eventLog.log("HEARTBEAT", nm.getHostName());
SchedulerNode node = nodes.get(nm.getNodeID());
FSSchedulerNode node = nodes.get(nm.getNodeID());
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@ -749,7 +748,7 @@ private synchronized void nodeUpdate(RMNode nm,
// already, we try to complete the reservation.
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
SchedulerApp reservedApplication =
FSSchedulerApp reservedApplication =
applications.get(reservedContainer.getApplicationAttemptId());
// Try to fulfill the reservation
@ -787,7 +786,7 @@ private synchronized void nodeUpdate(RMNode nm,
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
SchedulerNode node = nodes.get(nodeId);
FSSchedulerNode node = nodes.get(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}

View File

@ -43,7 +43,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@ -422,7 +421,7 @@ public synchronized void addApp(FSSchedulerApp app) {
/**
* Remove an app
*/
public synchronized void removeJob(SchedulerApp app) {
public synchronized void removeJob(FSSchedulerApp app) {
getQueue(app.getQueueName()).removeJob(app);
}

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* A Schedulable represents an entity that can launch tasks, such as a job
@ -104,7 +103,7 @@ abstract class Schedulable {
* already exists on this node, and the schedulable should fulfill that
* reservation if possible.
*/
public abstract Resource assignContainer(SchedulerNode node, boolean reserved);
public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved);
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare) {

View File

@ -71,11 +71,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@ -103,14 +103,14 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
private RMContext rmContext;
private Map<NodeId, SchedulerNode> nodes = new ConcurrentHashMap<NodeId, SchedulerNode>();
private Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized;
private Resource minimumAllocation;
private Resource maximumAllocation;
private Map<ApplicationAttemptId, SchedulerApp> applications
= new TreeMap<ApplicationAttemptId, SchedulerApp>();
private Map<ApplicationAttemptId, FiCaSchedulerApp> applications
= new TreeMap<ApplicationAttemptId, FiCaSchedulerApp>();
private ActiveUsersManager activeUsersManager;
@ -223,7 +223,7 @@ public synchronized void reinitialize(Configuration conf,
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release) {
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@ -276,7 +276,7 @@ public Allocation allocate(
}
}
private SchedulerApp getApplication(
private FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applications.get(applicationAttemptId);
}
@ -284,19 +284,19 @@ private SchedulerApp getApplication(
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
SchedulerApp app = getApplication(applicationAttemptId);
FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
private SchedulerNode getNode(NodeId nodeId) {
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
String user) {
// TODO: Fix store
SchedulerApp schedulerApp =
new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
this.rmContext, null);
applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
@ -311,7 +311,7 @@ private synchronized void doneApplication(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState)
throws IOException {
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
@ -344,15 +344,15 @@ private synchronized void doneApplication(
*
* @param node node on which resources are available to be allocated
*/
private void assignContainers(SchedulerNode node) {
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
for (Map.Entry<ApplicationAttemptId, SchedulerApp> e : applications
for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
.entrySet()) {
SchedulerApp application = e.getValue();
FiCaSchedulerApp application = e.getValue();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@ -383,15 +383,15 @@ private void assignContainers(SchedulerNode node) {
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
for (SchedulerApp application : applications.values()) {
for (FiCaSchedulerApp application : applications.values()) {
application.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
private int getMaxAllocatableContainers(SchedulerApp application,
Priority priority, SchedulerNode node, NodeType type) {
private int getMaxAllocatableContainers(FiCaSchedulerApp application,
Priority priority, FiCaSchedulerNode node, NodeType type) {
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, SchedulerNode.ANY);
application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
int maxContainers = offSwitchRequest.getNumContainers();
if (type == NodeType.OFF_SWITCH) {
@ -420,8 +420,8 @@ private int getMaxAllocatableContainers(SchedulerApp application,
}
private int assignContainersOnNode(SchedulerNode node,
SchedulerApp application, Priority priority
private int assignContainersOnNode(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority
) {
// Data-local
int nodeLocalContainers =
@ -447,8 +447,8 @@ private int assignContainersOnNode(SchedulerNode node,
return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
}
private int assignNodeLocalContainers(SchedulerNode node,
SchedulerApp application, Priority priority) {
private int assignNodeLocalContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
@ -473,15 +473,15 @@ private int assignNodeLocalContainers(SchedulerNode node,
return assignedContainers;
}
private int assignRackLocalContainers(SchedulerNode node,
SchedulerApp application, Priority priority) {
private int assignRackLocalContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getRackName());
if (request != null) {
// Don't allocate on this rack if the application doens't need containers
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, SchedulerNode.ANY);
application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
if (offSwitchRequest.getNumContainers() <= 0) {
return 0;
}
@ -498,11 +498,11 @@ private int assignRackLocalContainers(SchedulerNode node,
return assignedContainers;
}
private int assignOffSwitchContainers(SchedulerNode node,
SchedulerApp application, Priority priority) {
private int assignOffSwitchContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, SchedulerNode.ANY);
application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
if (request != null) {
assignedContainers =
assignContainer(node, application, priority,
@ -511,7 +511,7 @@ private int assignOffSwitchContainers(SchedulerNode node,
return assignedContainers;
}
private int assignContainer(SchedulerNode node, SchedulerApp application,
private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, int assignableContainers,
ResourceRequest request, NodeType type) {
LOG.debug("assignContainers:" +
@ -577,7 +577,7 @@ private int assignContainer(SchedulerNode node, SchedulerApp application,
private synchronized void nodeUpdate(RMNode rmNode,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
SchedulerNode node = getNode(rmNode.getNodeID());
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@ -667,10 +667,10 @@ public void handle(SchedulerEvent event) {
}
}
private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@ -696,10 +696,10 @@ private synchronized void containerCompleted(RMContainer rmContainer,
// Get the application for the finished container
Container container = rmContainer.getContainer();
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
// Get the node on which the container was allocated
SchedulerNode node = getNode(container.getNodeId());
FiCaSchedulerNode node = getNode(container.getNodeId());
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
@ -729,7 +729,7 @@ private synchronized void containerCompleted(RMContainer rmContainer,
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = getNode(nodeInfo.getNodeID());
FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
if (node == null) {
return;
}
@ -761,7 +761,7 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
}
private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}
@ -778,12 +778,12 @@ public void recover(RMState state) {
@Override
public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
SchedulerNode node = getNode(nodeId);
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
private RMContainer getRMContainer(ContainerId containerId) {
SchedulerApp application =
FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}

View File

@ -55,7 +55,7 @@
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
@ -72,7 +72,7 @@ public class NodeManager implements ContainerManager {
Resource used = recordFactory.newRecordInstance(Resource.class);
final ResourceTrackerService resourceTrackerService;
final SchedulerNode schedulerNode;
final FiCaSchedulerNode schedulerNode;
final Map<ApplicationId, List<Container>> containers =
new HashMap<ApplicationId, List<Container>>();
@ -98,7 +98,7 @@ public NodeManager(String hostName, int containerManagerPort, int httpPort,
request.setNodeId(this.nodeId);
resourceTrackerService.registerNodeManager(request)
.getRegistrationResponse();
this.schedulerNode = new SchedulerNode(rmContext.getRMNodes().get(
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
this.nodeId));
// Sanity check

View File

@ -40,8 +40,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -111,8 +111,8 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
LOG.info("Setup top-level queues a and b");
}
private SchedulerApp getMockApplication(int appId, String user) {
SchedulerApp application = mock(SchedulerApp.class);
private FiCaSchedulerApp getMockApplication(int appId, String user) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
TestUtils.getMockApplicationAttemptId(appId, 0);
doReturn(applicationAttemptId.getApplicationId()).
@ -209,7 +209,7 @@ public void testActiveApplicationLimits() throws Exception {
int APPLICATION_ID = 0;
// Submit first application
SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_0, user_0, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@ -217,7 +217,7 @@ public void testActiveApplicationLimits() throws Exception {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit second application
SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_1, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@ -225,7 +225,7 @@ public void testActiveApplicationLimits() throws Exception {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit third application, should remain pending
SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_2, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@ -240,7 +240,7 @@ public void testActiveApplicationLimits() throws Exception {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit another one for user_0
SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_3, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@ -251,7 +251,7 @@ public void testActiveApplicationLimits() throws Exception {
doReturn(3).when(queue).getMaximumActiveApplications();
// Submit first app for user_1
SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
queue.submitApplication(app_4, user_1, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@ -261,7 +261,7 @@ public void testActiveApplicationLimits() throws Exception {
assertEquals(0, queue.getNumPendingApplications(user_1));
// Submit second app for user_1, should block due to queue-limit
SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
queue.submitApplication(app_5, user_1, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
@ -290,7 +290,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
doReturn(2).when(queue).getMaximumActiveApplications();
// Submit first application
SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_0, user_0, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@ -299,7 +299,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertTrue(queue.activeApplications.contains(app_0));
// Submit second application
SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_1, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@ -308,7 +308,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertTrue(queue.activeApplications.contains(app_1));
// Submit third application, should remain pending
SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_2, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@ -317,7 +317,7 @@ public void testActiveLimitsWithKilledApps() throws Exception {
assertTrue(queue.pendingApplications.contains(app_2));
// Submit fourth application, should remain pending
SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_3, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
@ -393,7 +393,7 @@ public void testHeadroom() throws Exception {
String host_0 = "host_0";
String rack_0 = "rack_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB);
final String user_0 = "user_0";
final String user_1 = "user_1";
@ -408,8 +408,8 @@ public void testHeadroom() throws Exception {
// and check headroom
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0_0 =
spy(new SchedulerApp(appAttemptId_0_0, user_0, queue,
FiCaSchedulerApp app_0_0 =
spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_0_0, user_0, A);
@ -427,8 +427,8 @@ public void testHeadroom() throws Exception {
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_0_1 =
spy(new SchedulerApp(appAttemptId_0_1, user_0, queue,
FiCaSchedulerApp app_0_1 =
spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_0_1, user_0, A);
@ -446,8 +446,8 @@ public void testHeadroom() throws Exception {
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_1_0 =
spy(new SchedulerApp(appAttemptId_1_0, user_1, queue,
FiCaSchedulerApp app_1_0 =
spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_1_0, user_1, A);

View File

@ -62,8 +62,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.junit.After;
import org.junit.Before;
@ -171,14 +171,14 @@ static LeafQueue stubLeafQueue(LeafQueue queue) {
@Override
public Container answer(InvocationOnMock invocation)
throws Throwable {
final SchedulerApp application =
(SchedulerApp)(invocation.getArguments()[0]);
final FiCaSchedulerApp application =
(FiCaSchedulerApp)(invocation.getArguments()[0]);
final ContainerId containerId =
TestUtils.getMockContainerId(application);
Container container = TestUtils.getMockContainer(
containerId,
((SchedulerNode)(invocation.getArguments()[1])).getNodeID(),
((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(),
(Resource)(invocation.getArguments()[2]),
((Priority)invocation.getArguments()[3]));
return container;
@ -186,8 +186,8 @@ public Container answer(InvocationOnMock invocation)
}
).
when(queue).createContainer(
any(SchedulerApp.class),
any(SchedulerNode.class),
any(FiCaSchedulerApp.class),
any(FiCaSchedulerNode.class),
any(Resource.class),
any(Priority.class)
);
@ -195,7 +195,7 @@ public Container answer(InvocationOnMock invocation)
// 2. Stub out LeafQueue.parent.completedContainer
CSQueue parent = queue.getParent();
doNothing().when(parent).completedContainer(
any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class));
@ -238,22 +238,22 @@ public void testSingleQueueOneUserMetrics() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, B);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@ -284,14 +284,14 @@ public void testUserQueueAcl() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_d, d, null,
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null,
rmContext, null);
d.submitApplication(app_0, user_d, D);
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_d, d, null,
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null,
rmContext, null);
d.submitApplication(app_1, user_d, D); // same user
}
@ -309,7 +309,7 @@ public void testAppAttemptMetrics() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null,
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null,
rmContext, null);
a.submitApplication(app_0, user_0, B);
@ -324,7 +324,7 @@ public void testAppAttemptMetrics() throws Exception {
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null,
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
@ -359,22 +359,22 @@ public void testSingleQueueWithOneUser() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@ -483,30 +483,30 @@ public void testUserLimits() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@ -576,30 +576,30 @@ public void testHeadroomWithMaxCap() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@ -687,35 +687,35 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
SchedulerApp app_3 =
new SchedulerApp(appAttemptId_3, user_2, a,
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_3, user_2, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@ -862,21 +862,21 @@ public void testReservation() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
@ -961,23 +961,23 @@ public void testStolenReservedContainer() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
@ -1060,24 +1060,24 @@ public void testReservationExchange() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a,
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
@ -1175,23 +1175,23 @@ public void testLocalityScheduling() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "host_0";
String rack_0 = "rack_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
String host_1 = "host_1";
String rack_1 = "rack_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
String host_2 = "host_2";
String rack_2 = "rack_2";
SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@ -1284,7 +1284,7 @@ public void testLocalityScheduling() throws Exception {
assertEquals(1, app_0.getTotalRequiredResources(priority));
String host_3 = "host_3"; // on rack_1
SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
assignment = a.assignContainers(clusterResource, node_3);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
@ -1305,23 +1305,23 @@ public void testApplicationPriorityScheduling() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "host_0";
String rack_0 = "rack_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
String host_1 = "host_1";
String rack_1 = "rack_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
String host_2 = "host_2";
String rack_2 = "rack_2";
SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@ -1435,22 +1435,22 @@ public void testSchedulingConstraints() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a,
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0_0 = "host_0_0";
String rack_0 = "rack_0";
SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
String host_0_1 = "host_0_1";
SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
String host_1_0 = "host_1_0";
String rack_1 = "rack_1";
SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));

View File

@ -44,8 +44,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -99,22 +99,22 @@ private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
LOG.info("Setup top-level queues a and b");
}
private SchedulerApp getMockApplication(int appId, String user) {
SchedulerApp application = mock(SchedulerApp.class);
private FiCaSchedulerApp getMockApplication(int appId, String user) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
doReturn(user).when(application).getUser();
doReturn(Resources.createResource(0)).when(application).getHeadroom();
return application;
}
private void stubQueueAllocation(final CSQueue queue,
final Resource clusterResource, final SchedulerNode node,
final Resource clusterResource, final FiCaSchedulerNode node,
final int allocation) {
stubQueueAllocation(queue, clusterResource, node, allocation,
NodeType.NODE_LOCAL);
}
private void stubQueueAllocation(final CSQueue queue,
final Resource clusterResource, final SchedulerNode node,
final Resource clusterResource, final FiCaSchedulerNode node,
final int allocation, final NodeType type) {
// Simulate the queue allocation
@ -132,7 +132,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
((ParentQueue)queue).allocateResource(clusterResource,
allocatedResource);
} else {
SchedulerApp app1 = getMockApplication(0, "");
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
allocatedResource);
}
@ -198,9 +198,9 @@ public void testSingleLevelQueues() throws Exception {
final int memoryPerNode = 10;
final int numNodes = 2;
SchedulerNode node_0 =
FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@ -224,9 +224,9 @@ public void testSingleLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -237,9 +237,9 @@ public void testSingleLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -250,9 +250,9 @@ public void testSingleLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@ -263,9 +263,9 @@ public void testSingleLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_1);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@ -346,11 +346,11 @@ public void testMultiLevelQueues() throws Exception {
final int memoryPerNode = 10;
final int numNodes = 3;
SchedulerNode node_0 =
FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_2 =
FiCaSchedulerNode node_2 =
TestUtils.getMockNode("host_2", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@ -401,11 +401,11 @@ public void testMultiLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_0);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -427,13 +427,13 @@ public void testMultiLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_2);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@ -457,9 +457,9 @@ public void testOffSwitchScheduling() throws Exception {
final int memoryPerNode = 10;
final int numNodes = 2;
SchedulerNode node_0 =
FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@ -484,9 +484,9 @@ public void testOffSwitchScheduling() throws Exception {
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -498,9 +498,9 @@ public void testOffSwitchScheduling() throws Exception {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -523,9 +523,9 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
final int memoryPerNode = 10;
final int numNodes = 2;
SchedulerNode node_0 =
FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@ -550,9 +550,9 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@ -564,9 +564,9 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
any(FiCaSchedulerNode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);

View File

@ -44,8 +44,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
public class TestUtils {
@ -140,7 +140,7 @@ public static ApplicationId getMockApplicationId(int appId) {
return applicationAttemptId;
}
public static SchedulerNode getMockNode(
public static FiCaSchedulerNode getMockNode(
String host, String rack, int port, int capability) {
NodeId nodeId = mock(NodeId.class);
when(nodeId.getHost()).thenReturn(host);
@ -153,12 +153,12 @@ public static SchedulerNode getMockNode(
when(rmNode.getHostName()).thenReturn(host);
when(rmNode.getRackName()).thenReturn(rack);
SchedulerNode node = spy(new SchedulerNode(rmNode));
FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode));
LOG.info("node = " + host + " avail=" + node.getAvailableResource());
return node;
}
public static ContainerId getMockContainerId(SchedulerApp application) {
public static ContainerId getMockContainerId(FiCaSchedulerApp application) {
ContainerId containerId = mock(ContainerId.class);
doReturn(application.getApplicationAttemptId()).
when(containerId).getApplicationAttemptId();