diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d5f14408c43..799993114bd 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1,15 +1,20 @@ Hadoop MapReduce Change Log -Release 2.0.1-alpha - UNRELEASED +Release 2.1.0-alpha - Unreleased INCOMPATIBLE CHANGES NEW FEATURES + MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu) + MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu) IMPROVEMENTS + MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal + interface to allow schedulers to maintain their own. (acmurthy) + MAPREDUCE-4146. Support limits on task status string length and number of block locations in branch-2. (Ahmed Radwan via tomwhite) diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cf3e8616b73..385cf0afd65 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -192,7 +192,8 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition()) BuilderUtils.newApplicationResourceUsageReport(-1, -1, Resources.createResource(-1), Resources.createResource(-1), Resources.createResource(-1)); - + private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1; + public RMAppImpl(ApplicationId applicationId, RMContext rmContext, Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, String clientTokenStr, @@ -383,6 +384,7 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) { this.readLock.lock(); try { + ApplicationAttemptId currentApplicationAttemptId = null; String clientToken = UNAVAILABLE; String trackingUrl = UNAVAILABLE; String host = UNAVAILABLE; @@ -393,19 +395,27 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) { String diags = UNAVAILABLE; if (allowAccess) { if (this.currentAttempt != null) { + currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl(); clientToken = this.currentAttempt.getClientToken(); host = this.currentAttempt.getHost(); rpcPort = this.currentAttempt.getRpcPort(); appUsageReport = currentAttempt.getApplicationResourceUsageReport(); + } else { + currentApplicationAttemptId = + BuilderUtils.newApplicationAttemptId(this.applicationId, + DUMMY_APPLICATION_ATTEMPT_NUMBER); } diags = this.diagnostics.toString(); } else { appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; + currentApplicationAttemptId = + BuilderUtils.newApplicationAttemptId(this.applicationId, + DUMMY_APPLICATION_ATTEMPT_NUMBER); } return BuilderUtils.newApplicationReport(this.applicationId, - this.currentAttempt.getAppAttemptId(), this.user, this.queue, + currentApplicationAttemptId, this.user, this.queue, this.name, host, rpcPort, clientToken, createApplicationState(this.stateMachine.getCurrentState()), diags, trackingUrl, this.startTime, this.finishTime, finishState, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 18fbca654c2..e9c946e0ad9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -56,7 +56,7 @@ public ActiveUsersManager(QueueMetrics metrics) { * @param user application user * @param applicationId activated application */ - @Lock({Queue.class, SchedulerApp.class}) + @Lock({Queue.class, SchedulerApplication.class}) synchronized public void activateApplication( String user, ApplicationId applicationId) { Set userApps = usersApplications.get(user); @@ -79,7 +79,7 @@ synchronized public void activateApplication( * @param user application user * @param applicationId deactivated application */ - @Lock({Queue.class, SchedulerApp.class}) + @Lock({Queue.class, SchedulerApplication.class}) synchronized public void deactivateApplication( String user, ApplicationId applicationId) { Set userApps = usersApplications.get(user); @@ -102,7 +102,7 @@ synchronized public void deactivateApplication( * resource requests. * @return number of active users */ - @Lock({Queue.class, SchedulerApp.class}) + @Lock({Queue.class, SchedulerApplication.class}) synchronized public int getNumActiveUsers() { return activeUsers; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 5d11e52711c..7c44748e34b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -245,7 +245,8 @@ synchronized public void allocate(NodeType type, SchedulerNode node, * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority, + synchronized private void allocateNodeLocal( + SchedulerNode node, Priority priority, ResourceRequest nodeLocalRequest, Container container) { // Update consumption and track allocations allocate(container); @@ -273,7 +274,8 @@ synchronized private void allocateNodeLocal(SchedulerNode node, Priority priorit * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateRackLocal(SchedulerNode node, Priority priority, + synchronized private void allocateRackLocal( + SchedulerNode node, Priority priority, ResourceRequest rackLocalRequest, Container container) { // Update consumption and track allocations @@ -295,7 +297,8 @@ synchronized private void allocateRackLocal(SchedulerNode node, Priority priorit * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority, + synchronized private void allocateOffSwitch( + SchedulerNode node, Priority priority, ResourceRequest offSwitchRequest, Container container) { // Update consumption and track allocations diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java index c5ce4af9dbe..f1dc9d2ae37 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java @@ -36,7 +36,7 @@ public class SchedulerAppReport { private final Collection reserved; private final boolean pending; - public SchedulerAppReport(SchedulerApp app) { + public SchedulerAppReport(SchedulerApplication app) { this.live = app.getLiveContainers(); this.reserved = app.getReservedContainers(); this.pending = app.isPending(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java new file mode 100644 index 00000000000..51d65e3969f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -0,0 +1,43 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * Represents an Application from the viewpoint of the scheduler. + * Each running Application in the RM corresponds to one instance + * of this class. + */ +@Private +@Unstable +public abstract class SchedulerApplication { + + /** + * Get {@link ApplicationAttemptId} of the application master. + * @return ApplicationAttemptId of the application master + */ + public abstract ApplicationAttemptId getApplicationAttemptId(); + + /** + * Get the live containers of the application. + * @return live containers of the application + */ + public abstract Collection getLiveContainers(); + + /** + * Get the reserved containers of the application. + * @return the reserved containers of the application + */ + public abstract Collection getReservedContainers(); + + /** + * Is this application pending? + * @return true if it is else false. + */ + public abstract boolean isPending(); + +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 7e51841495f..a08ba7090ab 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -18,224 +18,45 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -public class SchedulerNode { - - private static final Log LOG = LogFactory.getLog(SchedulerNode.class); - - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource = recordFactory.newRecordInstance(Resource.class); - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - - private volatile int numContainers; - - private RMContainer reservedContainer; - - /* set of containers that are allocated containers */ - private final Map launchedContainers = - new HashMap(); - - private final RMNode rmNode; - - public static final String ANY = "*"; - - public SchedulerNode(RMNode node) { - this.rmNode = node; - this.availableResource.setMemory(node.getTotalCapability().getMemory()); - } - - public RMNode getRMNode() { - return this.rmNode; - } - - public NodeId getNodeID() { - return this.rmNode.getNodeID(); - } - - public String getHttpAddress() { - return this.rmNode.getHttpAddress(); - } - - public String getHostName() { - return this.rmNode.getHostName(); - } - - public String getRackName() { - return this.rmNode.getRackName(); - } +/** + * Represents a YARN Cluster Node from the viewpoint of the scheduler. + */ +@Private +@Unstable +public abstract class SchedulerNode { /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container + * Get hostname. + * @return hostname */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - public synchronized Resource getAvailableResource() { - return this.availableResource; - } - - public synchronized Resource getUsedResource() { - return this.usedResource; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) - return true; - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } + public abstract String getHostName(); /** - * Release an allocated container on this node. - * @param container container to be released + * Get rackname. + * @return rackname */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } + public abstract String getRackName(); + + /** + * Get used resources on the node. + * @return used resources on the node + */ + public abstract Resource getUsedResource(); - /* remove the containers from the nodemanger */ - launchedContainers.remove(container.getId()); - updateResource(container); + /** + * Get available resources on the node. + * @return available resources on the node + */ + public abstract Resource getAvailableResource(); - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); - } - - @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource().getMemory() + - " used=" + getUsedResource().getMemory(); - } - - public int getNumContainers() { - return numContainers; - } - - public synchronized List getRunningContainers() { - return new ArrayList(launchedContainers.values()); - } - - public synchronized void reserveResource( - SchedulerApp application, Priority priority, - RMContainer reservedContainer) { - // Check if it's already reserved - if (this.reservedContainer != null) { - // Sanity check - if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { - throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); - } - - // Cannot reserve more than one application on a given node! - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( - reservedContainer.getContainer().getId().getApplicationAttemptId())) { - throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " for application " + application.getApplicationId() + - " when currently" + - " reserved container " + this.reservedContainer + - " on node " + this); - } - - LOG.info("Updated reserved container " + - reservedContainer.getContainer().getId() + " on node " + - this + " for application " + application); - } else { - LOG.info("Reserved container " + reservedContainer.getContainer().getId() + - " on node " + this + " for application " + application); - } - this.reservedContainer = reservedContainer; - } - - public synchronized void unreserveResource(SchedulerApp application) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); - } - - reservedContainer = null; - } - - public synchronized RMContainer getReservedContainer() { - return reservedContainer; - } + /** + * Get number of active containers on the node. + * @return number of active containers on the node + */ + public abstract int getNumContainers(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 176a948e439..8a43b2da27b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; /** * CSQueue represents a node in the tree of @@ -150,7 +150,7 @@ public interface CSQueue * @param user user who submitted the application * @param queue queue to which the application is submitted */ - public void submitApplication(SchedulerApp application, String user, + public void submitApplication(FiCaSchedulerApp application, String user, String queue) throws AccessControlException; @@ -159,7 +159,7 @@ public void submitApplication(SchedulerApp application, String user, * @param application * @param queue application queue */ - public void finishApplication(SchedulerApp application, String queue); + public void finishApplication(FiCaSchedulerApp application, String queue); /** * Assign containers to applications in the queue or it's children (if any). @@ -168,7 +168,7 @@ public void submitApplication(SchedulerApp application, String user, * @return the assignment */ public CSAssignment assignContainers( - Resource clusterResource, SchedulerNode node); + Resource clusterResource, FiCaSchedulerNode node); /** * A container assigned to the queue has completed. @@ -182,7 +182,7 @@ public CSAssignment assignContainers( * @param event event to be sent to the container */ public void completedContainer(Resource clusterResource, - SchedulerApp application, SchedulerNode node, + FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer container, ContainerStatus containerStatus, RMContainerEventType event); @@ -219,6 +219,6 @@ public void reinitialize(CSQueue queue, Resource clusterResource) * @param application the application for which the container was allocated * @param container the container that was recovered. */ - public void recoverContainer(Resource clusterResource, SchedulerApp application, + public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, Container container); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index f304b0a4d77..8028e133eef 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -63,11 +63,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; @@ -103,10 +103,10 @@ public int compare(CSQueue q1, CSQueue q2) { } }; - static final Comparator applicationComparator = - new Comparator() { + static final Comparator applicationComparator = + new Comparator() { @Override - public int compare(SchedulerApp a1, SchedulerApp a2) { + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { return a1.getApplicationId().getId() - a2.getApplicationId().getId(); } }; @@ -131,8 +131,8 @@ public Configuration getConf() { private Map queues = new ConcurrentHashMap(); - private Map nodes = - new ConcurrentHashMap(); + private Map nodes = + new ConcurrentHashMap(); private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); @@ -141,8 +141,8 @@ public Configuration getConf() { private Resource minimumAllocation; private Resource maximumAllocation; - private Map applications = - new ConcurrentHashMap(); + private Map applications = + new ConcurrentHashMap(); private boolean initialized = false; @@ -299,7 +299,7 @@ static CSQueue parseQueue( CSQueue parent, String queueName, Map queues, Map oldQueues, Comparator queueComparator, - Comparator applicationComparator, + Comparator applicationComparator, QueueHook hook) throws IOException { CSQueue queue; String[] childQueueNames = @@ -370,8 +370,8 @@ synchronized CSQueue getQueue(String queueName) { } // TODO: Fix store - SchedulerApp SchedulerApp = - new SchedulerApp(applicationAttemptId, user, queue, + FiCaSchedulerApp SchedulerApp = + new FiCaSchedulerApp(applicationAttemptId, user, queue, queue.getActiveUsersManager(), rmContext, null); // Submit to the queue @@ -404,7 +404,7 @@ private synchronized void doneApplication( LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { // throw new IOException("Unknown application " + applicationId + @@ -456,7 +456,7 @@ private synchronized void doneApplication( public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release) { - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + applicationAttemptId); @@ -551,7 +551,7 @@ private synchronized void nodeUpdate(RMNode nm, LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); } - SchedulerNode node = getNode(nm.getNodeID()); + FiCaSchedulerNode node = getNode(nm.getNodeID()); // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { @@ -578,7 +578,7 @@ private synchronized void nodeUpdate(RMNode nm, RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - SchedulerApp reservedApplication = + FiCaSchedulerApp reservedApplication = getApplication(reservedContainer.getApplicationAttemptId()); // Try to fulfill the reservation @@ -601,10 +601,10 @@ private synchronized void nodeUpdate(RMNode nm, } - private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { // Get the application for the finished container ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + @@ -672,7 +672,7 @@ public void handle(SchedulerEvent event) { } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager)); + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); ++numNodeManagers; @@ -681,7 +681,7 @@ private synchronized void addNode(RMNode nodeManager) { } private synchronized void removeNode(RMNode nodeInfo) { - SchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); + FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } @@ -726,7 +726,7 @@ private synchronized void completedContainer(RMContainer rmContainer, // Get the application for the finished container ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Container " + container + " of" + " unknown application " + applicationAttemptId + @@ -735,7 +735,7 @@ private synchronized void completedContainer(RMContainer rmContainer, } // Get the node on which the container was allocated - SchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = getNode(container.getNodeId()); // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); @@ -749,24 +749,24 @@ private synchronized void completedContainer(RMContainer rmContainer, } @Lock(Lock.NoLock.class) - SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) { + FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) { return applications.get(applicationAttemptId); } @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId applicationAttemptId) { - SchedulerApp app = getApplication(applicationAttemptId); + FiCaSchedulerApp app = getApplication(applicationAttemptId); return app == null ? null : new SchedulerAppReport(app); } @Lock(Lock.NoLock.class) - SchedulerNode getNode(NodeId nodeId) { + FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } private RMContainer getRMContainer(ContainerId containerId) { - SchedulerApp application = + FiCaSchedulerApp application = getApplication(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } @@ -790,7 +790,7 @@ public void recover(RMState state) throws Exception { @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { - SchedulerNode node = getNode(nodeId); + FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 0d7988d07ba..ebf594b33c1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -61,9 +61,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -94,11 +94,11 @@ public class LeafQueue implements CSQueue { private float usedCapacity = 0.0f; private volatile int numContainers; - Set activeApplications; - Map applicationsMap = - new HashMap(); + Set activeApplications; + Map applicationsMap = + new HashMap(); - Set pendingApplications; + Set pendingApplications; private final Resource minimumAllocation; private final Resource maximumAllocation; @@ -126,7 +126,7 @@ public class LeafQueue implements CSQueue { public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, - Comparator applicationComparator, CSQueue old) { + Comparator applicationComparator, CSQueue old) { this.scheduler = cs; this.queueName = queueName; this.parent = parent; @@ -199,8 +199,8 @@ public LeafQueue(CapacitySchedulerContext cs, } this.pendingApplications = - new TreeSet(applicationComparator); - this.activeApplications = new TreeSet(applicationComparator); + new TreeSet(applicationComparator); + this.activeApplications = new TreeSet(applicationComparator); } private synchronized void setupQueueConfigs( @@ -580,7 +580,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { } @Override - public void submitApplication(SchedulerApp application, String userName, + public void submitApplication(FiCaSchedulerApp application, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! @@ -644,9 +644,9 @@ public void submitApplication(SchedulerApp application, String userName, } private synchronized void activateApplications() { - for (Iterator i=pendingApplications.iterator(); + for (Iterator i=pendingApplications.iterator(); i.hasNext(); ) { - SchedulerApp application = i.next(); + FiCaSchedulerApp application = i.next(); // Check queue limit if (getNumActiveApplications() >= getMaximumActiveApplications()) { @@ -666,7 +666,7 @@ private synchronized void activateApplications() { } } - private synchronized void addApplication(SchedulerApp application, User user) { + private synchronized void addApplication(FiCaSchedulerApp application, User user) { // Accept user.submitApplication(); pendingApplications.add(application); @@ -686,7 +686,7 @@ private synchronized void addApplication(SchedulerApp application, User user) { } @Override - public void finishApplication(SchedulerApp application, String queue) { + public void finishApplication(FiCaSchedulerApp application, String queue) { // Careful! Locking order is important! synchronized (this) { removeApplication(application, getUser(application.getUser())); @@ -696,7 +696,7 @@ public void finishApplication(SchedulerApp application, String queue) { parent.finishApplication(application, queue); } - public synchronized void removeApplication(SchedulerApp application, User user) { + public synchronized void removeApplication(FiCaSchedulerApp application, User user) { boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); @@ -728,7 +728,7 @@ public synchronized void removeApplication(SchedulerApp application, User user) ); } - private synchronized SchedulerApp getApplication( + private synchronized FiCaSchedulerApp getApplication( ApplicationAttemptId applicationAttemptId) { return applicationsMap.get(applicationAttemptId); } @@ -738,7 +738,7 @@ private synchronized SchedulerApp getApplication( @Override public synchronized CSAssignment - assignContainers(Resource clusterResource, SchedulerNode node) { + assignContainers(Resource clusterResource, FiCaSchedulerNode node) { if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getHostName() @@ -748,7 +748,7 @@ private synchronized SchedulerApp getApplication( // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - SchedulerApp application = + FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); return new CSAssignment( assignReservedContainer(application, node, reservedContainer, @@ -758,7 +758,7 @@ private synchronized SchedulerApp getApplication( } // Try to assign containers to applications in order - for (SchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : activeApplications) { if(LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " @@ -836,8 +836,8 @@ private synchronized SchedulerApp getApplication( } - private synchronized Resource assignReservedContainer(SchedulerApp application, - SchedulerNode node, RMContainer rmContainer, Resource clusterResource) { + private synchronized Resource assignReservedContainer(FiCaSchedulerApp application, + FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) { // Do we still need this reservation? Priority priority = rmContainer.getReservedPriority(); if (application.getTotalRequiredResources(priority) == 0) { @@ -880,9 +880,9 @@ private synchronized boolean assignToQueue(Resource clusterResource, return true; } - @Lock({LeafQueue.class, SchedulerApp.class}) + @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( - SchedulerApp application, Resource clusterResource, Resource required) { + FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); @@ -919,7 +919,7 @@ private Resource computeUserLimitAndSetHeadroom( } @Lock(NoLock.class) - private Resource computeUserLimit(SchedulerApp application, + private Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, Resource required) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if @@ -1007,7 +1007,7 @@ static int divideAndCeil(int a, int b) { return (a + (b - 1)) / b; } - boolean needContainers(SchedulerApp application, Priority priority, Resource required) { + boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); int starvation = 0; @@ -1036,7 +1036,7 @@ boolean needContainers(SchedulerApp application, Priority priority, Resource req } private CSAssignment assignContainersOnNode(Resource clusterResource, - SchedulerNode node, SchedulerApp application, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { Resource assigned = Resources.none(); @@ -1065,7 +1065,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, } private Resource assignNodeLocalContainers(Resource clusterResource, - SchedulerNode node, SchedulerApp application, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, node.getHostName()); @@ -1081,7 +1081,7 @@ private Resource assignNodeLocalContainers(Resource clusterResource, } private Resource assignRackLocalContainers(Resource clusterResource, - SchedulerNode node, SchedulerApp application, Priority priority, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, node.getRackName()); @@ -1095,8 +1095,8 @@ private Resource assignRackLocalContainers(Resource clusterResource, return Resources.none(); } - private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, - SchedulerApp application, Priority priority, + private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { ResourceRequest request = application.getResourceRequest(priority, RMNode.ANY); @@ -1111,8 +1111,8 @@ private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNo return Resources.none(); } - boolean canAssign(SchedulerApp application, Priority priority, - SchedulerNode node, NodeType type, RMContainer reservedContainer) { + boolean canAssign(FiCaSchedulerApp application, Priority priority, + FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) { // Clearly we need containers for this application... if (type == NodeType.OFF_SWITCH) { @@ -1159,14 +1159,14 @@ boolean canAssign(SchedulerApp application, Priority priority, } private Container getContainer(RMContainer rmContainer, - SchedulerApp application, SchedulerNode node, + FiCaSchedulerApp application, FiCaSchedulerNode node, Resource capability, Priority priority) { return (rmContainer != null) ? rmContainer.getContainer() : createContainer(application, node, capability, priority); } - public Container createContainer(SchedulerApp application, SchedulerNode node, + public Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, Resource capability, Priority priority) { NodeId nodeId = node.getRMNode().getNodeID(); @@ -1192,8 +1192,8 @@ public Container createContainer(SchedulerApp application, SchedulerNode node, return container; } - private Resource assignContainer(Resource clusterResource, SchedulerNode node, - SchedulerApp application, Priority priority, + private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getHostName() @@ -1267,8 +1267,8 @@ private Resource assignContainer(Resource clusterResource, SchedulerNode node, } } - private void reserve(SchedulerApp application, Priority priority, - SchedulerNode node, RMContainer rmContainer, Container container) { + private void reserve(FiCaSchedulerApp application, Priority priority, + FiCaSchedulerNode node, RMContainer rmContainer, Container container) { // Update reserved metrics if this is the first reservation if (rmContainer == null) { getMetrics().reserveResource( @@ -1282,8 +1282,8 @@ private void reserve(SchedulerApp application, Priority priority, node.reserveResource(application, priority, rmContainer); } - private void unreserve(SchedulerApp application, Priority priority, - SchedulerNode node, RMContainer rmContainer) { + private void unreserve(FiCaSchedulerApp application, Priority priority, + FiCaSchedulerNode node, RMContainer rmContainer) { // Done with the reservation? application.unreserve(node, priority); node.unreserveResource(application); @@ -1296,7 +1296,7 @@ private void unreserve(SchedulerApp application, Priority priority, @Override public void completedContainer(Resource clusterResource, - SchedulerApp application, SchedulerNode node, RMContainer rmContainer, + FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! @@ -1338,7 +1338,7 @@ public void completedContainer(Resource clusterResource, } synchronized void allocateResource(Resource clusterResource, - SchedulerApp application, Resource resource) { + FiCaSchedulerApp application, Resource resource) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1363,7 +1363,7 @@ synchronized void allocateResource(Resource clusterResource, } synchronized void releaseResource(Resource clusterResource, - SchedulerApp application, Resource resource) { + FiCaSchedulerApp application, Resource resource) { // Update queue metrics Resources.subtractFrom(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1401,7 +1401,7 @@ public synchronized void updateClusterResource(Resource clusterResource) { this, parent, clusterResource, minimumAllocation); // Update application properties - for (SchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : activeApplications) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, Resources.none()); @@ -1464,7 +1464,7 @@ public synchronized void releaseContainer(Resource resource) { @Override public void recoverContainer(Resource clusterResource, - SchedulerApp application, Container container) { + FiCaSchedulerApp application, Container container) { // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, application, container.getResource()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index bd7e988bf05..819fb5c9b24 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @Private @Evolving @@ -421,7 +421,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { } @Override - public void submitApplication(SchedulerApp application, String user, + public void submitApplication(FiCaSchedulerApp application, String user, String queue) throws AccessControlException { synchronized (this) { @@ -453,7 +453,7 @@ public void submitApplication(SchedulerApp application, String user, } } - private synchronized void addApplication(SchedulerApp application, + private synchronized void addApplication(FiCaSchedulerApp application, String user) { ++numApplications; @@ -466,7 +466,7 @@ private synchronized void addApplication(SchedulerApp application, } @Override - public void finishApplication(SchedulerApp application, String queue) { + public void finishApplication(FiCaSchedulerApp application, String queue) { synchronized (this) { removeApplication(application, application.getUser()); @@ -478,7 +478,7 @@ public void finishApplication(SchedulerApp application, String queue) { } } - public synchronized void removeApplication(SchedulerApp application, + public synchronized void removeApplication(FiCaSchedulerApp application, String user) { --numApplications; @@ -516,7 +516,7 @@ synchronized void setMaxCapacity(float maximumCapacity) { @Override public synchronized CSAssignment assignContainers( - Resource clusterResource, SchedulerNode node) { + Resource clusterResource, FiCaSchedulerNode node) { CSAssignment assignment = new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); @@ -594,14 +594,14 @@ private synchronized boolean assignToQueue(Resource clusterResource) { } - private boolean canAssign(SchedulerNode node) { + private boolean canAssign(FiCaSchedulerNode node) { return (node.getReservedContainer() == null) && Resources.greaterThanOrEqual(node.getAvailableResource(), minimumAllocation); } synchronized CSAssignment assignContainersToChildQueues(Resource cluster, - SchedulerNode node) { + FiCaSchedulerNode node) { CSAssignment assignment = new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); @@ -654,7 +654,7 @@ void printChildQueues() { @Override public void completedContainer(Resource clusterResource, - SchedulerApp application, SchedulerNode node, + FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! @@ -715,7 +715,7 @@ public QueueMetrics getMetrics() { @Override public void recoverContainer(Resource clusterResource, - SchedulerApp application, Container container) { + FiCaSchedulerApp application, Container container) { // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, container.getResource()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java similarity index 93% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index c8ed2c08554..e53ca82071f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; import java.util.ArrayList; import java.util.Collection; @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -53,6 +54,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -63,9 +69,11 @@ * of this class. */ @SuppressWarnings("unchecked") -public class SchedulerApp { +@Private +@Unstable +public class FiCaSchedulerApp extends SchedulerApplication { - private static final Log LOG = LogFactory.getLog(SchedulerApp.class); + private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); private final RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); @@ -101,7 +109,7 @@ public class SchedulerApp { .newRecordInstance(Resource.class); private final RMContext rmContext; - public SchedulerApp(ApplicationAttemptId applicationAttemptId, + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext, ApplicationStore store) { this.rmContext = rmContext; @@ -115,6 +123,7 @@ public ApplicationId getApplicationId() { return this.appSchedulingInfo.getApplicationId(); } + @Override public ApplicationAttemptId getApplicationAttemptId() { return this.appSchedulingInfo.getApplicationAttemptId(); } @@ -156,6 +165,7 @@ public Resource getResource(Priority priority) { * Is this application pending? * @return true if it is else false. */ + @Override public boolean isPending() { return this.appSchedulingInfo.isPending(); } @@ -168,6 +178,7 @@ public String getQueueName() { * Get the list of live containers * @return All of the live containers */ + @Override public synchronized Collection getLiveContainers() { return new ArrayList(liveContainers.values()); } @@ -222,7 +233,7 @@ synchronized public void containerCompleted(RMContainer rmContainer, Resources.subtractFrom(currentConsumption, containerResource); } - synchronized public RMContainer allocate(NodeType type, SchedulerNode node, + synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, Container container) { @@ -347,7 +358,7 @@ public synchronized Resource getCurrentReservation() { return currentReservation; } - public synchronized RMContainer reserve(SchedulerNode node, Priority priority, + public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority, RMContainer rmContainer, Container container) { // Create RMContainer if necessary if (rmContainer == null) { @@ -384,7 +395,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority, return rmContainer; } - public synchronized void unreserve(SchedulerNode node, Priority priority) { + public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); @@ -410,7 +421,7 @@ public synchronized void unreserve(SchedulerNode node, Priority priority) { * @param priority priority of reserved container * @return true is reserved, false if not */ - public synchronized boolean isReserved(SchedulerNode node, Priority priority) { + public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); if (reservedContainers != null) { @@ -434,6 +445,7 @@ public synchronized float getLocalityWaitFactor( * Get the list of reserved containers * @return All of the reserved containers. */ + @Override public synchronized List getReservedContainers() { List reservedContainers = new ArrayList(); for (Map.Entry> e : diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java new file mode 100644 index 00000000000..b07e9c6b1ac --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public class FiCaSchedulerNode extends SchedulerNode { + + private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); + + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Resource availableResource = recordFactory.newRecordInstance(Resource.class); + private Resource usedResource = recordFactory.newRecordInstance(Resource.class); + + private volatile int numContainers; + + private RMContainer reservedContainer; + + /* set of containers that are allocated containers */ + private final Map launchedContainers = + new HashMap(); + + private final RMNode rmNode; + + public static final String ANY = "*"; + + public FiCaSchedulerNode(RMNode node) { + this.rmNode = node; + this.availableResource.setMemory(node.getTotalCapability().getMemory()); + } + + public RMNode getRMNode() { + return this.rmNode; + } + + public NodeId getNodeID() { + return this.rmNode.getNodeID(); + } + + public String getHttpAddress() { + return this.rmNode.getHttpAddress(); + } + + @Override + public String getHostName() { + return this.rmNode.getHostName(); + } + + @Override + public String getRackName() { + return this.rmNode.getRackName(); + } + + /** + * The Scheduler has allocated containers on this node to the + * given application. + * + * @param applicationId application + * @param rmContainer allocated container + */ + public synchronized void allocateContainer(ApplicationId applicationId, + RMContainer rmContainer) { + Container container = rmContainer.getContainer(); + deductAvailableResource(container.getResource()); + ++numContainers; + + launchedContainers.put(container.getId(), rmContainer); + + LOG.info("Assigned container " + container.getId() + + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + + getAvailableResource() + " available"); + } + + @Override + public synchronized Resource getAvailableResource() { + return this.availableResource; + } + + @Override + public synchronized Resource getUsedResource() { + return this.usedResource; + } + + private synchronized boolean isValidContainer(Container c) { + if (launchedContainers.containsKey(c.getId())) + return true; + return false; + } + + private synchronized void updateResource(Container container) { + addAvailableResource(container.getResource()); + --numContainers; + } + + /** + * Release an allocated container on this node. + * @param container container to be released + */ + public synchronized void releaseContainer(Container container) { + if (!isValidContainer(container)) { + LOG.error("Invalid container released " + container); + return; + } + + /* remove the containers from the nodemanger */ + launchedContainers.remove(container.getId()); + updateResource(container); + + LOG.info("Released container " + container.getId() + + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available" + ", release resources=" + true); + } + + + private synchronized void addAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid resource addition of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.addTo(availableResource, resource); + Resources.subtractFrom(usedResource, resource); + } + + private synchronized void deductAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.subtractFrom(availableResource, resource); + Resources.addTo(usedResource, resource); + } + + @Override + public String toString() { + return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + + " available=" + getAvailableResource().getMemory() + + " used=" + getUsedResource().getMemory(); + } + + @Override + public int getNumContainers() { + return numContainers; + } + + public synchronized List getRunningContainers() { + return new ArrayList(launchedContainers.values()); + } + + public synchronized void reserveResource( + SchedulerApplication application, Priority priority, + RMContainer reservedContainer) { + // Check if it's already reserved + if (this.reservedContainer != null) { + // Sanity check + if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { + throw new IllegalStateException("Trying to reserve" + + " container " + reservedContainer + + " on node " + reservedContainer.getReservedNode() + + " when currently" + " reserved resource " + this.reservedContainer + + " on node " + this.reservedContainer.getReservedNode()); + } + + // Cannot reserve more than one application on a given node! + if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( + reservedContainer.getContainer().getId().getApplicationAttemptId())) { + throw new IllegalStateException("Trying to reserve" + + " container " + reservedContainer + + " for application " + application.getApplicationAttemptId() + + " when currently" + + " reserved container " + this.reservedContainer + + " on node " + this); + } + + LOG.info("Updated reserved container " + + reservedContainer.getContainer().getId() + " on node " + + this + " for application " + application); + } else { + LOG.info("Reserved container " + reservedContainer.getContainer().getId() + + " on node " + this + " for application " + application); + } + this.reservedContainer = reservedContainer; + } + + public synchronized void unreserveResource( + SchedulerApplication application) { + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + reservedContainer.getContainer().getId().getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationAttemptId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } + + reservedContainer = null; + } + + public synchronized RMContainer getReservedContainer() { + return reservedContainer; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 7f253abd214..7eb3be2d02a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -37,8 +37,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -69,7 +67,7 @@ public String getName() { return app.getApplicationId().toString(); } - public SchedulerApp getApp() { + public FSSchedulerApp getApp() { return app; } @@ -150,7 +148,8 @@ public void setRunnable(boolean runnable) { * given appliction on the given node with the given capability and * priority. */ - public Container createContainer(SchedulerApp application, SchedulerNode node, + public Container createContainer( + FSSchedulerApp application, FSSchedulerNode node, Resource capability, Priority priority) { NodeId nodeId = node.getRMNode().getNodeID(); @@ -180,10 +179,10 @@ public Container createContainer(SchedulerApp application, SchedulerNode node, * Reserve a spot for {@code container} on this {@code node}. If * the container is {@code alreadyReserved} on the node, simply * update relevant bookeeping. This dispatches ro relevant handlers - * in the {@link SchedulerNode} and {@link SchedulerApp} classes. + * in the {@link FSSchedulerNode} and {@link SchedulerApp} classes. */ - private void reserve(SchedulerApp application, Priority priority, - SchedulerNode node, Container container, boolean alreadyReserved) { + private void reserve(FSSchedulerApp application, Priority priority, + FSSchedulerNode node, Container container, boolean alreadyReserved) { LOG.info("Making reservation: node=" + node.getHostName() + " app_id=" + app.getApplicationId()); if (!alreadyReserved) { @@ -209,8 +208,8 @@ private void reserve(SchedulerApp application, Priority priority, * {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode * handlers for an unreservation. */ - private void unreserve(SchedulerApp application, Priority priority, - SchedulerNode node) { + private void unreserve(FSSchedulerApp application, Priority priority, + FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); application.unreserve(node, priority); node.unreserveResource(application); @@ -225,8 +224,8 @@ private void unreserve(SchedulerApp application, Priority priority, * not have enough memory, create a reservation. This is called once we are * sure the particular request should be facilitated by this node. */ - private Resource assignContainer(SchedulerNode node, - SchedulerApp application, Priority priority, + private Resource assignContainer(FSSchedulerNode node, + FSSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, boolean reserved) { // How much does this request need? @@ -282,7 +281,7 @@ private Resource assignContainer(SchedulerNode node, @Override - public Resource assignContainer(SchedulerNode node, boolean reserved) { + public Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.info("Node offered to app: " + getName() + " reserved: " + reserved); if (reserved) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index d7e6e7032b2..100fbf2af0c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; /** * A queue containing several applications. @@ -35,7 +34,8 @@ public class FSQueue { private String name; /** Applications in this specific queue; does not include children queues' jobs. */ - private Collection applications = new ArrayList(); + private Collection applications = + new ArrayList(); /** Scheduling mode for jobs inside the queue (fair or FIFO) */ private SchedulingMode schedulingMode; @@ -50,7 +50,7 @@ public FSQueue(FairScheduler scheduler, String name) { this.scheduler = scheduler; } - public Collection getApplications() { + public Collection getApplications() { return applications; } @@ -59,7 +59,7 @@ public void addApp(FSSchedulerApp app) { queueSchedulable.addApp(new AppSchedulable(scheduler, app, this)); } - public void removeJob(SchedulerApp app) { + public void removeJob(FSSchedulerApp app) { applications.remove(app); queueSchedulable.removeApp(app); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java index 62d39ef47a0..33625a77e64 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java @@ -45,8 +45,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @Private @Unstable @@ -80,7 +78,7 @@ public void addApp(AppSchedulable app) { appScheds.add(app); } - public void removeApp(SchedulerApp app) { + public void removeApp(FSSchedulerApp app) { for (Iterator it = appScheds.iterator(); it.hasNext();) { AppSchedulable appSched = it.next(); if (appSched.getApp() == app) { @@ -146,7 +144,7 @@ public long getStartTime() { } @Override - public Resource assignContainer(SchedulerNode node, boolean reserved) { + public Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.debug("Node offered to queue: " + this.getName() + " reserved: " + reserved); // If this queue is over its limit, reject if (Resources.greaterThan(this.getResourceUsage(), diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index de18e3abc56..b9931df86bf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -18,31 +18,410 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -/** - * This class extends the application lifecycle management contained with - * the {@link SchedulerApp} class and adds delay-scheduling information - * specific to the FairScheduler. - */ -public class FSSchedulerApp extends SchedulerApp { - private static final Log LOG = LogFactory.getLog(SchedulerApp.class); +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; + +public class FSSchedulerApp extends SchedulerApplication { + + private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private final AppSchedulingInfo appSchedulingInfo; + private final Queue queue; + + private final Resource currentConsumption = recordFactory + .newRecordInstance(Resource.class); + private Resource resourceLimit = recordFactory + .newRecordInstance(Resource.class); + + private Map liveContainers + = new HashMap(); + private List newlyAllocatedContainers = + new ArrayList(); + + final Map> reservedContainers = + new HashMap>(); + + /** + * Count how many times the application has been given an opportunity + * to schedule a task at each priority. Each time the scheduler + * asks the application for a task at this priority, it is incremented, + * and each time the application successfully schedules a task, it + * is reset to 0. + */ + Multiset schedulingOpportunities = HashMultiset.create(); + + Multiset reReservations = HashMultiset.create(); + + Resource currentReservation = recordFactory + .newRecordInstance(Resource.class); + + private final RMContext rmContext; + public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, ApplicationStore store) { + this.rmContext = rmContext; + this.appSchedulingInfo = + new AppSchedulingInfo(applicationAttemptId, user, queue, + activeUsersManager, store); + this.queue = queue; + } + + public ApplicationId getApplicationId() { + return this.appSchedulingInfo.getApplicationId(); + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return this.appSchedulingInfo.getApplicationAttemptId(); + } + + public String getUser() { + return this.appSchedulingInfo.getUser(); + } + + public synchronized void updateResourceRequests( + List requests) { + this.appSchedulingInfo.updateResourceRequests(requests); + } + + public Map getResourceRequests(Priority priority) { + return this.appSchedulingInfo.getResourceRequests(priority); + } + + public int getNewContainerId() { + return this.appSchedulingInfo.getNewContainerId(); + } + + public Collection getPriorities() { + return this.appSchedulingInfo.getPriorities(); + } + + public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { + return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress); + } + + public synchronized int getTotalRequiredResources(Priority priority) { + return getResourceRequest(priority, RMNode.ANY).getNumContainers(); + } + + public Resource getResource(Priority priority) { + return this.appSchedulingInfo.getResource(priority); + } + + /** + * Is this application pending? + * @return true if it is else false. + */ + @Override + public boolean isPending() { + return this.appSchedulingInfo.isPending(); + } + + public String getQueueName() { + return this.appSchedulingInfo.getQueueName(); + } + + /** + * Get the list of live containers + * @return All of the live containers + */ + @Override + public synchronized Collection getLiveContainers() { + return new ArrayList(liveContainers.values()); + } + + public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { + // Cleanup all scheduling information + this.appSchedulingInfo.stop(rmAppAttemptFinalState); + } + + @SuppressWarnings("unchecked") + public synchronized void containerLaunchedOnNode(ContainerId containerId, + NodeId nodeId) { + // Inform the container + RMContainer rmContainer = + getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + } + + synchronized public void containerCompleted(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent( + containerId, + containerStatus, + event) + ); + LOG.info("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + + // Remove from the list of containers + liveContainers.remove(rmContainer.getContainerId()); + + RMAuditLogger.logSuccess(getUser(), + AuditConstants.RELEASE_CONTAINER, "SchedulerApp", + getApplicationId(), containerId); + + // Update usage metrics + Resource containerResource = rmContainer.getContainer().getResource(); + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + Resources.subtractFrom(currentConsumption, containerResource); + } + + synchronized public List pullNewlyAllocatedContainers() { + List returnContainerList = new ArrayList( + newlyAllocatedContainers.size()); + for (RMContainer rmContainer : newlyAllocatedContainers) { + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + returnContainerList.add(rmContainer.getContainer()); + } + newlyAllocatedContainers.clear(); + return returnContainerList; + } + + public Resource getCurrentConsumption() { + return this.currentConsumption; + } + + synchronized public void showRequests() { + if (LOG.isDebugEnabled()) { + for (Priority priority : getPriorities()) { + Map requests = getResourceRequests(priority); + if (requests != null) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " headRoom=" + getHeadroom() + + " currentConsumption=" + currentConsumption.getMemory()); + for (ResourceRequest request : requests.values()) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " request=" + request); + } + } + } + } + } + + public synchronized RMContainer getRMContainer(ContainerId id) { + return liveContainers.get(id); + } + + synchronized public void addSchedulingOpportunity(Priority priority) { + this.schedulingOpportunities.setCount(priority, + schedulingOpportunities.count(priority) + 1); + } + + /** + * Return the number of times the application has been given an opportunity + * to schedule a task at the given priority since the last time it + * successfully did so. + */ + synchronized public int getSchedulingOpportunities(Priority priority) { + return this.schedulingOpportunities.count(priority); + } + + synchronized void resetReReservations(Priority priority) { + this.reReservations.setCount(priority, 0); + } + + synchronized void addReReservation(Priority priority) { + this.reReservations.add(priority); + } + + synchronized public int getReReservations(Priority priority) { + return this.reReservations.count(priority); + } + + public synchronized int getNumReservedContainers(Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + return (reservedContainers == null) ? 0 : reservedContainers.size(); + } + + /** + * Get total current reservations. + * Used only by unit tests + * @return total current reservations + */ + @Stable + @Private + public synchronized Resource getCurrentReservation() { + return currentReservation; + } + + public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority, + RMContainer rmContainer, Container container) { + // Create RMContainer if necessary + if (rmContainer == null) { + rmContainer = + new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), rmContext.getDispatcher().getEventHandler(), + rmContext.getContainerAllocationExpirer()); + + Resources.addTo(currentReservation, container.getResource()); + + // Reset the re-reservation count + resetReReservations(priority); + } else { + // Note down the re-reservation + addReReservation(priority); + } + rmContainer.handle(new RMContainerReservedEvent(container.getId(), + container.getResource(), node.getNodeID(), priority)); + + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers == null) { + reservedContainers = new HashMap(); + this.reservedContainers.put(priority, reservedContainers); + } + reservedContainers.put(node.getNodeID(), rmContainer); + + LOG.info("Application " + getApplicationId() + + " reserved container " + rmContainer + + " on node " + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + + "; currentReservation " + currentReservation.getMemory()); + + return rmContainer; + } + + public synchronized void unreserve(FSSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(priority); + } + + // Reset the re-reservation count + resetReReservations(priority); + + Resource resource = reservedContainer.getContainer().getResource(); + Resources.subtractFrom(currentReservation, resource); + + LOG.info("Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + " at priority " + + priority + "; currentReservation " + currentReservation); + } + + /** + * Has the application reserved the given node at the + * given priority? + * @param node node to be checked + * @param priority priority of reserved container + * @return true is reserved, false if not + */ + public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers != null) { + return reservedContainers.containsKey(node.getNodeID()); + } + return false; + } + + public synchronized float getLocalityWaitFactor( + Priority priority, int clusterNodes) { + // Estimate: Required unique resources (i.e. hosts + racks) + int requiredResources = + Math.max(this.getResourceRequests(priority).size() - 1, 0); + + // waitFactor can't be more than '1' + // i.e. no point skipping more than clustersize opportunities + return Math.min(((float)requiredResources / clusterNodes), 1.0f); + } + + /** + * Get the list of reserved containers + * @return All of the reserved containers. + */ + @Override + public synchronized List getReservedContainers() { + List reservedContainers = new ArrayList(); + for (Map.Entry> e : + this.reservedContainers.entrySet()) { + reservedContainers.addAll(e.getValue().values()); + } + return reservedContainers; + } + + public synchronized void setHeadroom(Resource globalLimit) { + this.resourceLimit = globalLimit; + } + + /** + * Get available headroom in terms of resources for the application's user. + * @return available resource headroom + */ + public synchronized Resource getHeadroom() { + // Corner case to deal with applications being slightly over-limit + if (resourceLimit.getMemory() < 0) { + resourceLimit.setMemory(0); + } + + return resourceLimit; + } + + public Queue getQueue() { + return queue; + } /** * Delay scheduling: We often want to prioritize scheduling of node-local @@ -62,13 +441,6 @@ public class FSSchedulerApp extends SchedulerApp { // Time of the last container scheduled at the current allowed level Map lastScheduledContainer = new HashMap(); - public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext, ApplicationStore store) { - super(applicationAttemptId, user, queue, activeUsersManager, - rmContext, store); - } - /** * Should be called when an application has successfully scheduled a container, * or when the scheduling locality threshold is relaxed. @@ -78,7 +450,7 @@ public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, */ synchronized public void resetSchedulingOpportunities(Priority priority) { this.lastScheduledContainer.put(priority, System.currentTimeMillis()); - super.resetSchedulingOpportunities(priority); + this.schedulingOpportunities.setCount(priority, 0); } /** @@ -127,7 +499,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL)) { } - synchronized public RMContainer allocate(NodeType type, SchedulerNode node, + synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, Container container) { // Update allowed locality level @@ -143,7 +515,42 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && this.resetAllowedLocalityLevel(priority, type); } } - return super.allocate(type, node, priority, request, container); + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(priority) <= 0) { + return null; + } + + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, this + .getApplicationAttemptId(), node.getNodeID(), this.rmContext + .getDispatcher().getEventHandler(), this.rmContext + .getContainerAllocationExpirer()); + + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); + + // Update consumption and track allocations + appSchedulingInfo.allocate(type, node, priority, request, container); + Resources.addTo(currentConsumption, container.getResource()); + + // Inform the container + rmContainer.handle( + new RMContainerEvent(container.getId(), RMContainerEventType.START)); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + + container.getId().getApplicationAttemptId() + + " container=" + container.getId() + " host=" + + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), + AuditConstants.ALLOC_CONTAINER, "SchedulerApp", + getApplicationId(), container.getId()); + + return rmContainer; } /** diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java new file mode 100644 index 00000000000..b8cef425d0e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public class FSSchedulerNode extends SchedulerNode { + + private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); + + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Resource availableResource = recordFactory.newRecordInstance(Resource.class); + private Resource usedResource = recordFactory.newRecordInstance(Resource.class); + + private volatile int numContainers; + + private RMContainer reservedContainer; + + /* set of containers that are allocated containers */ + private final Map launchedContainers = + new HashMap(); + + private final RMNode rmNode; + + public static final String ANY = "*"; + + public FSSchedulerNode(RMNode node) { + this.rmNode = node; + this.availableResource.setMemory(node.getTotalCapability().getMemory()); + } + + public RMNode getRMNode() { + return this.rmNode; + } + + public NodeId getNodeID() { + return this.rmNode.getNodeID(); + } + + public String getHttpAddress() { + return this.rmNode.getHttpAddress(); + } + + @Override + public String getHostName() { + return this.rmNode.getHostName(); + } + + @Override + public String getRackName() { + return this.rmNode.getRackName(); + } + + /** + * The Scheduler has allocated containers on this node to the + * given application. + * + * @param applicationId application + * @param rmContainer allocated container + */ + public synchronized void allocateContainer(ApplicationId applicationId, + RMContainer rmContainer) { + Container container = rmContainer.getContainer(); + deductAvailableResource(container.getResource()); + ++numContainers; + + launchedContainers.put(container.getId(), rmContainer); + + LOG.info("Assigned container " + container.getId() + + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + + getAvailableResource() + " available"); + } + + @Override + public synchronized Resource getAvailableResource() { + return this.availableResource; + } + + @Override + public synchronized Resource getUsedResource() { + return this.usedResource; + } + + private synchronized boolean isValidContainer(Container c) { + if (launchedContainers.containsKey(c.getId())) + return true; + return false; + } + + private synchronized void updateResource(Container container) { + addAvailableResource(container.getResource()); + --numContainers; + } + + /** + * Release an allocated container on this node. + * @param container container to be released + */ + public synchronized void releaseContainer(Container container) { + if (!isValidContainer(container)) { + LOG.error("Invalid container released " + container); + return; + } + + /* remove the containers from the nodemanger */ + launchedContainers.remove(container.getId()); + updateResource(container); + + LOG.info("Released container " + container.getId() + + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available" + ", release resources=" + true); + } + + + private synchronized void addAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid resource addition of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.addTo(availableResource, resource); + Resources.subtractFrom(usedResource, resource); + } + + private synchronized void deductAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.subtractFrom(availableResource, resource); + Resources.addTo(usedResource, resource); + } + + @Override + public String toString() { + return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + + " available=" + getAvailableResource().getMemory() + + " used=" + getUsedResource().getMemory(); + } + + @Override + public int getNumContainers() { + return numContainers; + } + + public synchronized List getRunningContainers() { + return new ArrayList(launchedContainers.values()); + } + + public synchronized void reserveResource( + FSSchedulerApp application, Priority priority, + RMContainer reservedContainer) { + // Check if it's already reserved + if (this.reservedContainer != null) { + // Sanity check + if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { + throw new IllegalStateException("Trying to reserve" + + " container " + reservedContainer + + " on node " + reservedContainer.getReservedNode() + + " when currently" + " reserved resource " + this.reservedContainer + + " on node " + this.reservedContainer.getReservedNode()); + } + + // Cannot reserve more than one application on a given node! + if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( + reservedContainer.getContainer().getId().getApplicationAttemptId())) { + throw new IllegalStateException("Trying to reserve" + + " container " + reservedContainer + + " for application " + application.getApplicationId() + + " when currently" + + " reserved container " + this.reservedContainer + + " on node " + this); + } + + LOG.info("Updated reserved container " + + reservedContainer.getContainer().getId() + " on node " + + this + " for application " + application); + } else { + LOG.info("Reserved container " + reservedContainer.getContainer().getId() + + " on node " + this + " for application " + application); + } + this.reservedContainer = reservedContainer; + } + + public synchronized void unreserveResource( + FSSchedulerApp application) { + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + reservedContainer.getContainer().getId().getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } + + reservedContainer = null; + } + + public synchronized RMContainer getReservedContainer() { + return reservedContainer; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2b311f45e2a..46ed93724e4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -63,9 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -116,12 +114,12 @@ public class FairScheduler implements ResourceScheduler { // This stores per-application scheduling information, indexed by // attempt ID's for fast lookup. - protected Map applications - = new HashMap(); + protected Map applications + = new HashMap(); // Nodes in the cluster, indexed by NodeId - private Map nodes = - new ConcurrentHashMap(); + private Map nodes = + new ConcurrentHashMap(); // Aggregate capacity of the cluster private Resource clusterCapacity = @@ -158,7 +156,7 @@ public List getQueueSchedulables() { } private RMContainer getRMContainer(ContainerId containerId) { - SchedulerApp application = + FSSchedulerApp application = applications.get(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } @@ -294,7 +292,8 @@ protected void preemptResources(List scheds, Resource toPree if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) return; - Map apps = new HashMap(); + Map apps = + new HashMap(); Map queues = new HashMap(); // Collect running containers from over-scheduled queues @@ -526,7 +525,7 @@ private synchronized void removeApplication( LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = applications.get(applicationAttemptId); if (application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -576,7 +575,7 @@ private synchronized void completedContainer(RMContainer rmContainer, // Get the application for the finished container ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - SchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = applications.get(applicationAttemptId); if (application == null) { LOG.info("Container " + container + " of" + " unknown application " + applicationAttemptId + @@ -585,7 +584,7 @@ private synchronized void completedContainer(RMContainer rmContainer, } // Get the node on which the container was allocated - SchedulerNode node = nodes.get(container.getNodeId()); + FSSchedulerNode node = nodes.get(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { application.unreserve(node, rmContainer.getReservedPriority()); @@ -602,7 +601,7 @@ private synchronized void completedContainer(RMContainer rmContainer, } private synchronized void addNode(RMNode node) { - this.nodes.put(node.getNodeID(), new SchedulerNode(node)); + this.nodes.put(node.getNodeID(), new FSSchedulerNode(node)); Resources.addTo(clusterCapacity, node.getTotalCapability()); LOG.info("Added node " + node.getNodeAddress() + @@ -610,7 +609,7 @@ private synchronized void addNode(RMNode node) { } private synchronized void removeNode(RMNode rmNode) { - SchedulerNode node = this.nodes.get(rmNode.getNodeID()); + FSSchedulerNode node = this.nodes.get(rmNode.getNodeID()); Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); // Remove running containers @@ -643,7 +642,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release) { // Make sure this application exists - SchedulerApp application = applications.get(appAttemptId); + FSSchedulerApp application = applications.get(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -704,10 +703,10 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, * Process a container which has launched on a node, as reported by the * node. */ - private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { // Get the application for the finished container ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - SchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = applications.get(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + @@ -726,7 +725,7 @@ private synchronized void nodeUpdate(RMNode nm, List completedContainers) { LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); eventLog.log("HEARTBEAT", nm.getHostName()); - SchedulerNode node = nodes.get(nm.getNodeID()); + FSSchedulerNode node = nodes.get(nm.getNodeID()); // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { @@ -749,7 +748,7 @@ private synchronized void nodeUpdate(RMNode nm, // already, we try to complete the reservation. RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - SchedulerApp reservedApplication = + FSSchedulerApp reservedApplication = applications.get(reservedContainer.getApplicationAttemptId()); // Try to fulfill the reservation @@ -787,7 +786,7 @@ private synchronized void nodeUpdate(RMNode nm, @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { - SchedulerNode node = nodes.get(nodeId); + FSSchedulerNode node = nodes.get(nodeId); return node == null ? null : new SchedulerNodeReport(node); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 08c15a3d6e0..8785891f5ea 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -422,7 +421,7 @@ public synchronized void addApp(FSSchedulerApp app) { /** * Remove an app */ - public synchronized void removeJob(SchedulerApp app) { + public synchronized void removeJob(FSSchedulerApp app) { getQueue(app.getQueueName()).removeJob(app); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 628da699fea..c7f111aa2e5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -23,7 +23,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; /** * A Schedulable represents an entity that can launch tasks, such as a job @@ -104,7 +103,7 @@ abstract class Schedulable { * already exists on this node, and the schedulable should fulfill that * reservation if possible. */ - public abstract Resource assignContainer(SchedulerNode node, boolean reserved); + public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved); /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 36bbea565cc..f06207bc2b1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -71,11 +71,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; @@ -103,14 +103,14 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); private RMContext rmContext; - private Map nodes = new ConcurrentHashMap(); + private Map nodes = new ConcurrentHashMap(); private boolean initialized; private Resource minimumAllocation; private Resource maximumAllocation; - private Map applications - = new TreeMap(); + private Map applications + = new TreeMap(); private ActiveUsersManager activeUsersManager; @@ -223,7 +223,7 @@ public synchronized void reinitialize(Configuration conf, public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, List release) { - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + "or non existant application " + applicationAttemptId); @@ -276,7 +276,7 @@ public Allocation allocate( } } - private SchedulerApp getApplication( + private FiCaSchedulerApp getApplication( ApplicationAttemptId applicationAttemptId) { return applications.get(applicationAttemptId); } @@ -284,19 +284,19 @@ private SchedulerApp getApplication( @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId applicationAttemptId) { - SchedulerApp app = getApplication(applicationAttemptId); + FiCaSchedulerApp app = getApplication(applicationAttemptId); return app == null ? null : new SchedulerAppReport(app); } - private SchedulerNode getNode(NodeId nodeId) { + private FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } private synchronized void addApplication(ApplicationAttemptId appAttemptId, String user) { // TODO: Fix store - SchedulerApp schedulerApp = - new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, + FiCaSchedulerApp schedulerApp = + new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext, null); applications.put(appAttemptId, schedulerApp); metrics.submitApp(user, appAttemptId.getAttemptId()); @@ -311,7 +311,7 @@ private synchronized void doneApplication( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState) throws IOException { - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { throw new IOException("Unknown application " + applicationAttemptId + " has completed!"); @@ -344,15 +344,15 @@ private synchronized void doneApplication( * * @param node node on which resources are available to be allocated */ - private void assignContainers(SchedulerNode node) { + private void assignContainers(FiCaSchedulerNode node) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " #applications=" + applications.size()); // Try to assign containers to applications in fifo order - for (Map.Entry e : applications + for (Map.Entry e : applications .entrySet()) { - SchedulerApp application = e.getValue(); + FiCaSchedulerApp application = e.getValue(); LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { @@ -383,15 +383,15 @@ private void assignContainers(SchedulerNode node) { // Update the applications' headroom to correctly take into // account the containers assigned in this update. - for (SchedulerApp application : applications.values()) { + for (FiCaSchedulerApp application : applications.values()) { application.setHeadroom(Resources.subtract(clusterResource, usedResource)); } } - private int getMaxAllocatableContainers(SchedulerApp application, - Priority priority, SchedulerNode node, NodeType type) { + private int getMaxAllocatableContainers(FiCaSchedulerApp application, + Priority priority, FiCaSchedulerNode node, NodeType type) { ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, SchedulerNode.ANY); + application.getResourceRequest(priority, FiCaSchedulerNode.ANY); int maxContainers = offSwitchRequest.getNumContainers(); if (type == NodeType.OFF_SWITCH) { @@ -420,8 +420,8 @@ private int getMaxAllocatableContainers(SchedulerApp application, } - private int assignContainersOnNode(SchedulerNode node, - SchedulerApp application, Priority priority + private int assignContainersOnNode(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority ) { // Data-local int nodeLocalContainers = @@ -447,8 +447,8 @@ private int assignContainersOnNode(SchedulerNode node, return (nodeLocalContainers + rackLocalContainers + offSwitchContainers); } - private int assignNodeLocalContainers(SchedulerNode node, - SchedulerApp application, Priority priority) { + private int assignNodeLocalContainers(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); @@ -473,15 +473,15 @@ private int assignNodeLocalContainers(SchedulerNode node, return assignedContainers; } - private int assignRackLocalContainers(SchedulerNode node, - SchedulerApp application, Priority priority) { + private int assignRackLocalContainers(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (request != null) { // Don't allocate on this rack if the application doens't need containers ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, SchedulerNode.ANY); + application.getResourceRequest(priority, FiCaSchedulerNode.ANY); if (offSwitchRequest.getNumContainers() <= 0) { return 0; } @@ -498,11 +498,11 @@ private int assignRackLocalContainers(SchedulerNode node, return assignedContainers; } - private int assignOffSwitchContainers(SchedulerNode node, - SchedulerApp application, Priority priority) { + private int assignOffSwitchContainers(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = - application.getResourceRequest(priority, SchedulerNode.ANY); + application.getResourceRequest(priority, FiCaSchedulerNode.ANY); if (request != null) { assignedContainers = assignContainer(node, application, priority, @@ -511,7 +511,7 @@ private int assignOffSwitchContainers(SchedulerNode node, return assignedContainers; } - private int assignContainer(SchedulerNode node, SchedulerApp application, + private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, int assignableContainers, ResourceRequest request, NodeType type) { LOG.debug("assignContainers:" + @@ -577,7 +577,7 @@ private int assignContainer(SchedulerNode node, SchedulerApp application, private synchronized void nodeUpdate(RMNode rmNode, List newlyLaunchedContainers, List completedContainers) { - SchedulerNode node = getNode(rmNode.getNodeID()); + FiCaSchedulerNode node = getNode(rmNode.getNodeID()); // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { @@ -667,10 +667,10 @@ public void handle(SchedulerEvent event) { } } - private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { // Get the application for the finished container ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + @@ -696,10 +696,10 @@ private synchronized void containerCompleted(RMContainer rmContainer, // Get the application for the finished container Container container = rmContainer.getContainer(); ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); // Get the node on which the container was allocated - SchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + @@ -729,7 +729,7 @@ private synchronized void containerCompleted(RMContainer rmContainer, private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { - SchedulerNode node = getNode(nodeInfo.getNodeID()); + FiCaSchedulerNode node = getNode(nodeInfo.getNodeID()); if (node == null) { return; } @@ -761,7 +761,7 @@ public List getQueueUserAclInfo() { } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager)); + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); } @@ -778,12 +778,12 @@ public void recover(RMState state) { @Override public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { - SchedulerNode node = getNode(nodeId); + FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } private RMContainer getRMContainer(ContainerId containerId) { - SchedulerApp application = + FiCaSchedulerApp application = getApplication(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index df1d8923f44..ca05363b600 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.BuilderUtils; @Private @@ -72,7 +72,7 @@ public class NodeManager implements ContainerManager { Resource used = recordFactory.newRecordInstance(Resource.class); final ResourceTrackerService resourceTrackerService; - final SchedulerNode schedulerNode; + final FiCaSchedulerNode schedulerNode; final Map> containers = new HashMap>(); @@ -98,7 +98,7 @@ public NodeManager(String hostName, int containerManagerPort, int httpPort, request.setNodeId(this.nodeId); resourceTrackerService.registerNodeManager(request) .getRegistrationResponse(); - this.schedulerNode = new SchedulerNode(rmContext.getRMNodes().get( + this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get( this.nodeId)); // Sanity check diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 3b682c92fca..8876bd338dd 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -40,8 +40,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -111,8 +111,8 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { LOG.info("Setup top-level queues a and b"); } - private SchedulerApp getMockApplication(int appId, String user) { - SchedulerApp application = mock(SchedulerApp.class); + private FiCaSchedulerApp getMockApplication(int appId, String user) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); doReturn(applicationAttemptId.getApplicationId()). @@ -209,7 +209,7 @@ public void testActiveApplicationLimits() throws Exception { int APPLICATION_ID = 0; // Submit first application - SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_0, user_0, A); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -217,7 +217,7 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit second application - SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_1, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -225,7 +225,7 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit third application, should remain pending - SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_2, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -240,7 +240,7 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit another one for user_0 - SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_3, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -251,7 +251,7 @@ public void testActiveApplicationLimits() throws Exception { doReturn(3).when(queue).getMaximumActiveApplications(); // Submit first app for user_1 - SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); queue.submitApplication(app_4, user_1, A); assertEquals(3, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -261,7 +261,7 @@ public void testActiveApplicationLimits() throws Exception { assertEquals(0, queue.getNumPendingApplications(user_1)); // Submit second app for user_1, should block due to queue-limit - SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); queue.submitApplication(app_5, user_1, A); assertEquals(3, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -290,7 +290,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { doReturn(2).when(queue).getMaximumActiveApplications(); // Submit first application - SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_0, user_0, A); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -299,7 +299,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertTrue(queue.activeApplications.contains(app_0)); // Submit second application - SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_1, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -308,7 +308,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertTrue(queue.activeApplications.contains(app_1)); // Submit third application, should remain pending - SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_2, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -317,7 +317,7 @@ public void testActiveLimitsWithKilledApps() throws Exception { assertTrue(queue.pendingApplications.contains(app_2)); // Submit fourth application, should remain pending - SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_3, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -393,7 +393,7 @@ public void testHeadroom() throws Exception { String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB); final String user_0 = "user_0"; final String user_1 = "user_1"; @@ -408,8 +408,8 @@ public void testHeadroom() throws Exception { // and check headroom final ApplicationAttemptId appAttemptId_0_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0_0 = - spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, + FiCaSchedulerApp app_0_0 = + spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_0, user_0, A); @@ -427,8 +427,8 @@ public void testHeadroom() throws Exception { // Submit second application from user_0, check headroom final ApplicationAttemptId appAttemptId_0_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_0_1 = - spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, + FiCaSchedulerApp app_0_1 = + spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_1, user_0, A); @@ -446,8 +446,8 @@ public void testHeadroom() throws Exception { // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_1_0 = - spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, + FiCaSchedulerApp app_1_0 = + spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_1_0, user_1, A); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index f74dab4459a..6e659cf7112 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -62,8 +62,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.junit.After; import org.junit.Before; @@ -171,14 +171,14 @@ static LeafQueue stubLeafQueue(LeafQueue queue) { @Override public Container answer(InvocationOnMock invocation) throws Throwable { - final SchedulerApp application = - (SchedulerApp)(invocation.getArguments()[0]); + final FiCaSchedulerApp application = + (FiCaSchedulerApp)(invocation.getArguments()[0]); final ContainerId containerId = TestUtils.getMockContainerId(application); Container container = TestUtils.getMockContainer( containerId, - ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(), + ((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(), (Resource)(invocation.getArguments()[2]), ((Priority)invocation.getArguments()[3])); return container; @@ -186,8 +186,8 @@ public Container answer(InvocationOnMock invocation) } ). when(queue).createContainer( - any(SchedulerApp.class), - any(SchedulerNode.class), + any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class) ); @@ -195,7 +195,7 @@ public Container answer(InvocationOnMock invocation) // 2. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); doNothing().when(parent).completedContainer( - any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class), + any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class)); @@ -238,22 +238,22 @@ public void testSingleQueueOneUserMetrics() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, B); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, B); // same user // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -284,14 +284,14 @@ public void testUserQueueAcl() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); - SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_d, d, null, + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null, rmContext, null); d.submitApplication(app_0, user_d, D); // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(0, 2); - SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_d, d, null, + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null, rmContext, null); d.submitApplication(app_1, user_d, D); // same user } @@ -309,7 +309,7 @@ public void testAppAttemptMetrics() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); - SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null, + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null, rmContext, null); a.submitApplication(app_0, user_0, B); @@ -324,7 +324,7 @@ public void testAppAttemptMetrics() throws Exception { // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(0, 2); - SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null, + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null, rmContext, null); a.submitApplication(app_1, user_0, B); // same user @@ -359,22 +359,22 @@ public void testSingleQueueWithOneUser() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, A); // same user // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -483,30 +483,30 @@ public void testUserLimits() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -576,30 +576,30 @@ public void testHeadroomWithMaxCap() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -687,35 +687,35 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); - SchedulerApp app_3 = - new SchedulerApp(appAttemptId_3, user_2, a, + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_2, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_3, user_2, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -862,21 +862,21 @@ public void testReservation() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -961,23 +961,23 @@ public void testStolenReservedContainer() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -1060,24 +1060,24 @@ public void testReservationExchange() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -1175,23 +1175,23 @@ public void testLocalityScheduling() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); String host_1 = "host_1"; String rack_1 = "rack_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); String host_2 = "host_2"; String rack_2 = "rack_2"; - SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -1284,7 +1284,7 @@ public void testLocalityScheduling() throws Exception { assertEquals(1, app_0.getTotalRequiredResources(priority)); String host_3 = "host_3"; // on rack_1 - SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); + FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); assignment = a.assignContainers(clusterResource, node_3); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), @@ -1305,23 +1305,23 @@ public void testApplicationPriorityScheduling() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); String host_1 = "host_1"; String rack_1 = "rack_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); String host_2 = "host_2"; String rack_2 = "rack_2"; - SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -1435,22 +1435,22 @@ public void testSchedulingConstraints() throws Exception { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0_0 = "host_0_0"; String rack_0 = "rack_0"; - SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); String host_0_1 = "host_0_1"; - SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); String host_1_0 = "host_1_0"; String rack_1 = "rack_1"; - SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index c4c48497d3c..64c75818626 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -99,22 +99,22 @@ private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) { LOG.info("Setup top-level queues a and b"); } - private SchedulerApp getMockApplication(int appId, String user) { - SchedulerApp application = mock(SchedulerApp.class); + private FiCaSchedulerApp getMockApplication(int appId, String user) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); doReturn(user).when(application).getUser(); doReturn(Resources.createResource(0)).when(application).getHeadroom(); return application; } private void stubQueueAllocation(final CSQueue queue, - final Resource clusterResource, final SchedulerNode node, + final Resource clusterResource, final FiCaSchedulerNode node, final int allocation) { stubQueueAllocation(queue, clusterResource, node, allocation, NodeType.NODE_LOCAL); } private void stubQueueAllocation(final CSQueue queue, - final Resource clusterResource, final SchedulerNode node, + final Resource clusterResource, final FiCaSchedulerNode node, final int allocation, final NodeType type) { // Simulate the queue allocation @@ -132,7 +132,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { ((ParentQueue)queue).allocateResource(clusterResource, allocatedResource); } else { - SchedulerApp app1 = getMockApplication(0, ""); + FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, allocatedResource); } @@ -198,9 +198,9 @@ public void testSingleLevelQueues() throws Exception { final int memoryPerNode = 10; final int numNodes = 2; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -224,9 +224,9 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_1); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -237,9 +237,9 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -250,9 +250,9 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -263,9 +263,9 @@ public void testSingleLevelQueues() throws Exception { root.assignContainers(clusterResource, node_1); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -346,11 +346,11 @@ public void testMultiLevelQueues() throws Exception { final int memoryPerNode = 10; final int numNodes = 3; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_2 = + FiCaSchedulerNode node_2 = TestUtils.getMockNode("host_2", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -401,11 +401,11 @@ public void testMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -427,13 +427,13 @@ public void testMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_2); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -457,9 +457,9 @@ public void testOffSwitchScheduling() throws Exception { final int memoryPerNode = 10; final int numNodes = 2; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -484,9 +484,9 @@ public void testOffSwitchScheduling() throws Exception { root.assignContainers(clusterResource, node_1); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -498,9 +498,9 @@ public void testOffSwitchScheduling() throws Exception { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -523,9 +523,9 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { final int memoryPerNode = 10; final int numNodes = 2; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -550,9 +550,9 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_1); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -564,9 +564,9 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 5dc6dfbe6dc..261fd72ad9a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; public class TestUtils { @@ -140,7 +140,7 @@ public static ApplicationId getMockApplicationId(int appId) { return applicationAttemptId; } - public static SchedulerNode getMockNode( + public static FiCaSchedulerNode getMockNode( String host, String rack, int port, int capability) { NodeId nodeId = mock(NodeId.class); when(nodeId.getHost()).thenReturn(host); @@ -153,12 +153,12 @@ public static SchedulerNode getMockNode( when(rmNode.getHostName()).thenReturn(host); when(rmNode.getRackName()).thenReturn(rack); - SchedulerNode node = spy(new SchedulerNode(rmNode)); + FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode)); LOG.info("node = " + host + " avail=" + node.getAvailableResource()); return node; } - public static ContainerId getMockContainerId(SchedulerApp application) { + public static ContainerId getMockContainerId(FiCaSchedulerApp application) { ContainerId containerId = mock(ContainerId.class); doReturn(application.getApplicationAttemptId()). when(containerId).getApplicationAttemptId();