diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1f0cfadfae7..445b7a577e8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -96,6 +96,9 @@ Release 2.4.0 - UNRELEASED YARN-1496. Protocol additions to allow moving apps between queues (Sandy Ryza) + YARN-1498. Common scheduler changes for moving apps between queues (Sandy + Ryza) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 3270dea4498..de71f71419d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -64,7 +64,7 @@ public class AppSchedulingInfo { private Set blacklist = new HashSet(); //private final ApplicationStore store; - private final ActiveUsersManager activeUsersManager; + private ActiveUsersManager activeUsersManager; /* Allocated by scheduler */ boolean pending = true; // for app metrics @@ -171,11 +171,10 @@ public class AppSchedulingInfo { .getNumContainers() : 0; Resource lastRequestCapability = lastRequest != null ? lastRequest .getCapability() : Resources.none(); - metrics.incrPendingResources(user, request.getNumContainers() - - lastRequestContainers, Resources.subtractFrom( // save a clone - Resources.multiply(request.getCapability(), request - .getNumContainers()), Resources.multiply(lastRequestCapability, - lastRequestContainers))); + metrics.incrPendingResources(user, request.getNumContainers(), + request.getCapability()); + metrics.decrPendingResources(user, lastRequestContainers, + lastRequestCapability); } } } @@ -262,6 +261,7 @@ public class AppSchedulingInfo { pending = false; metrics.runAppAttempt(applicationId, user); } + if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationId=" + applicationId + " container=" + container.getId() @@ -269,7 +269,7 @@ public class AppSchedulingInfo { + " user=" + user + " resource=" + request.getCapability()); } - metrics.allocateResources(user, 1, request.getCapability()); + metrics.allocateResources(user, 1, request.getCapability(), true); } /** @@ -359,6 +359,26 @@ public class AppSchedulingInfo { } } + synchronized public void move(Queue newQueue) { + QueueMetrics oldMetrics = queue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + for (Map asks : requests.values()) { + ResourceRequest request = asks.get(ResourceRequest.ANY); + if (request != null) { + oldMetrics.decrPendingResources(user, request.getNumContainers(), + request.getCapability()); + newMetrics.incrPendingResources(user, request.getNumContainers(), + request.getCapability()); + } + } + oldMetrics.moveAppFrom(this); + newMetrics.moveAppTo(this); + activeUsersManager.deactivateApplication(user, applicationId); + activeUsersManager = newQueue.getActiveUsersManager(); + activeUsersManager.activateApplication(user, applicationId); + this.queue = newQueue; + } + synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { // clear pending resources metrics for the application QueueMetrics metrics = queue.getMetrics(); @@ -366,8 +386,7 @@ public class AppSchedulingInfo { ResourceRequest request = asks.get(ResourceRequest.ANY); if (request != null) { metrics.decrPendingResources(user, request.getNumContainers(), - Resources.multiply(request.getCapability(), request - .getNumContainers())); + request.getCapability()); } } metrics.finishAppAttempt(applicationId, pending, user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index 0380d323824..c51f819af58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -58,4 +58,6 @@ public interface Queue { List getQueueUserAclInfo(UserGroupInformation user); boolean hasAccess(QueueACL acl, UserGroupInformation user); + + public ActiveUsersManager getActiveUsersManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 13066141dc5..a0519e2fa01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -280,6 +280,36 @@ public class QueueMetrics implements MetricsSource { parent.finishApp(user, rmAppFinalState); } } + + public void moveAppFrom(AppSchedulingInfo app) { + if (app.isPending()) { + appsPending.decr(); + } else { + appsRunning.decr(); + } + QueueMetrics userMetrics = getUserMetrics(app.getUser()); + if (userMetrics != null) { + userMetrics.moveAppFrom(app); + } + if (parent != null) { + parent.moveAppFrom(app); + } + } + + public void moveAppTo(AppSchedulingInfo app) { + if (app.isPending()) { + appsPending.incr(); + } else { + appsRunning.incr(); + } + QueueMetrics userMetrics = getUserMetrics(app.getUser()); + if (userMetrics != null) { + userMetrics.moveAppTo(app); + } + if (parent != null) { + parent.moveAppTo(app); + } + } /** * Set available resources. To be called by scheduler periodically as @@ -324,8 +354,8 @@ public class QueueMetrics implements MetricsSource { private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); - pendingMB.incr(res.getMemory()); - pendingVCores.incr(res.getVirtualCores()); + pendingMB.incr(res.getMemory() * containers); + pendingVCores.incr(res.getVirtualCores() * containers); } public void decrPendingResources(String user, int containers, Resource res) { @@ -341,22 +371,25 @@ public class QueueMetrics implements MetricsSource { private void _decrPendingResources(int containers, Resource res) { pendingContainers.decr(containers); - pendingMB.decr(res.getMemory()); - pendingVCores.decr(res.getVirtualCores()); + pendingMB.decr(res.getMemory() * containers); + pendingVCores.decr(res.getVirtualCores() * containers); } - public void allocateResources(String user, int containers, Resource res) { + public void allocateResources(String user, int containers, Resource res, + boolean decrPending) { allocatedContainers.incr(containers); aggregateContainersAllocated.incr(containers); allocatedMB.incr(res.getMemory() * containers); allocatedVCores.incr(res.getVirtualCores() * containers); - _decrPendingResources(containers, Resources.multiply(res, containers)); + if (decrPending) { + _decrPendingResources(containers, res); + } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.allocateResources(user, containers, res); + userMetrics.allocateResources(user, containers, res, decrPending); } if (parent != null) { - parent.allocateResources(user, containers, res); + parent.allocateResources(user, containers, res, decrPending); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 38753cbbeda..7e7fdb765e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -57,7 +57,7 @@ import com.google.common.collect.Multiset; */ @Private @Unstable -public abstract class SchedulerApplicationAttempt { +public class SchedulerApplicationAttempt { private static final Log LOG = LogFactory .getLog(SchedulerApplicationAttempt.class); @@ -91,7 +91,7 @@ public abstract class SchedulerApplicationAttempt { protected Map lastScheduledContainer = new HashMap(); - protected final Queue queue; + protected Queue queue; protected boolean isStopped = false; protected final RMContext rmContext; @@ -431,4 +431,25 @@ public abstract class SchedulerApplicationAttempt { this.appSchedulingInfo .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo); } + + public void move(Queue newQueue) { + QueueMetrics oldMetrics = queue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + String user = getUser(); + for (RMContainer liveContainer : liveContainers.values()) { + Resource resource = liveContainer.getContainer().getResource(); + oldMetrics.releaseResources(user, 1, resource); + newMetrics.allocateResources(user, 1, resource, false); + } + for (Map map : reservedContainers.values()) { + for (RMContainer reservedContainer : map.values()) { + Resource resource = reservedContainer.getReservedResource(); + oldMetrics.unreserveResource(user, resource); + newMetrics.reserveResource(user, resource); + } + } + + appSchedulingInfo.move(newQueue); + this.queue = newQueue; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 17d585ab33f..e842a6a3557 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -54,11 +55,14 @@ public class FSLeafQueue extends FSQueue { private long lastTimeAtMinShare; private long lastTimeAtHalfFairShare; + private final ActiveUsersManager activeUsersManager; + public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); + activeUsersManager = new ActiveUsersManager(getMetrics()); } public void addApp(FSSchedulerApp app, boolean runnable) { @@ -245,4 +249,9 @@ public class FSLeafQueue extends FSQueue { public int getNumRunnableApps() { return runnableAppScheds.size(); } + + @Override + public ActiveUsersManager getActiveUsersManager() { + return activeUsersManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 070780a97d4..427cb864579 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @Private @Unstable @@ -194,4 +194,10 @@ public class FSParentQueue extends FSQueue { childQueue.collectSchedulerApplications(apps); } } + + @Override + public ActiveUsersManager getActiveUsersManager() { + // Should never be called since all applications are submitted to LeafQueues + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 696a64c3c02..026f22c49c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -184,6 +184,11 @@ public class FifoScheduler extends AbstractYarnScheduler implements public boolean hasAccess(QueueACL acl, UserGroupInformation user) { return getQueueAcls().get(acl).isUserAllowed(user); } + + @Override + public ActiveUsersManager getActiveUsersManager() { + return activeUsersManager; + } }; @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index d0a8f7235fd..8ad71d231bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -73,7 +73,7 @@ public class TestQueueMetrics { checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); + metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); @@ -81,7 +81,7 @@ public class TestQueueMetrics { metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); - metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); @@ -171,7 +171,7 @@ public class TestQueueMetrics { metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); + metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); @@ -181,7 +181,7 @@ public class TestQueueMetrics { checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true); - metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); @@ -232,7 +232,7 @@ public class TestQueueMetrics { metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); + metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); @@ -242,7 +242,7 @@ public class TestQueueMetrics { checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true); - metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); metrics.reserveResource(user, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java new file mode 100644 index 00000000000..93fd30027e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.junit.After; +import org.junit.Test; + +public class TestSchedulerApplicationAttempt { + + private static final NodeId nodeId = NodeId.newInstance("somehost", 5); + + private Configuration conf = new Configuration(); + + @After + public void tearDown() { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); + } + + @Test + public void testMove() { + final String user = "user1"; + Queue parentQueue = createQueue("parent", null); + Queue oldQueue = createQueue("old", parentQueue); + Queue newQueue = createQueue("new", parentQueue); + QueueMetrics parentMetrics = parentQueue.getMetrics(); + QueueMetrics oldMetrics = oldQueue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + + ApplicationAttemptId appAttId = createAppAttemptId(0, 0); + SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, + user, oldQueue, oldQueue.getActiveUsersManager(), null); + oldMetrics.submitApp(user); + + // Resource request + Resource requestedResource = Resource.newInstance(1536, 2); + Priority requestedPriority = Priority.newInstance(2); + ResourceRequest request = ResourceRequest.newInstance(requestedPriority, + ResourceRequest.ANY, requestedResource, 3); + app.updateResourceRequests(Arrays.asList(request)); + + // Allocated container + RMContainer container1 = createRMContainer(appAttId, 1, requestedResource); + app.liveContainers.put(container1.getContainerId(), container1); + SchedulerNode node = createNode(); + app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, requestedPriority, + request, container1.getContainer()); + + // Reserved container + Priority prio1 = Priority.newInstance(1); + Resource reservedResource = Resource.newInstance(2048, 3); + RMContainer container2 = createReservedRMContainer(appAttId, 1, reservedResource, + node.getNodeID(), prio1); + Map reservations = new HashMap(); + reservations.put(node.getNodeID(), container2); + app.reservedContainers.put(prio1, reservations); + oldMetrics.reserveResource(user, reservedResource); + + checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4); + checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0); + checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4); + + app.move(newQueue); + + checkQueueMetrics(oldMetrics, 0, 0, 0, 0, 0, 0, 0, 0); + checkQueueMetrics(newMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4); + checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4); + } + + private void checkQueueMetrics(QueueMetrics metrics, int activeApps, + int runningApps, int allocMb, int allocVcores, int reservedMb, + int reservedVcores, int pendingMb, int pendingVcores) { + assertEquals(activeApps, metrics.getActiveApps()); + assertEquals(runningApps, metrics.getAppsRunning()); + assertEquals(allocMb, metrics.getAllocatedMB()); + assertEquals(allocVcores, metrics.getAllocatedVirtualCores()); + assertEquals(reservedMb, metrics.getReservedMB()); + assertEquals(reservedVcores, metrics.getReservedVirtualCores()); + assertEquals(pendingMb, metrics.getPendingMB()); + assertEquals(pendingVcores, metrics.getPendingVirtualCores()); + } + + private SchedulerNode createNode() { + SchedulerNode node = mock(SchedulerNode.class); + when(node.getNodeName()).thenReturn("somehost"); + when(node.getRackName()).thenReturn("somerack"); + when(node.getNodeID()).thenReturn(nodeId); + return node; + } + + private RMContainer createReservedRMContainer(ApplicationAttemptId appAttId, + int id, Resource resource, NodeId nodeId, Priority reservedPriority) { + RMContainer container = createRMContainer(appAttId, id, resource); + when(container.getReservedResource()).thenReturn(resource); + when(container.getReservedPriority()).thenReturn(reservedPriority); + when(container.getReservedNode()).thenReturn(nodeId); + return container; + } + + private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id, + Resource resource) { + ContainerId containerId = ContainerId.newInstance(appAttId, id); + RMContainer rmContainer = mock(RMContainer.class); + Container container = mock(Container.class); + when(container.getResource()).thenReturn(resource); + when(container.getNodeId()).thenReturn(nodeId); + when(rmContainer.getContainer()).thenReturn(container); + when(rmContainer.getContainerId()).thenReturn(containerId); + return rmContainer; + } + + private Queue createQueue(String name, Queue parent) { + QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf); + ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); + Queue queue = mock(Queue.class); + when(queue.getMetrics()).thenReturn(metrics); + when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); + return queue; + } + + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { + ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); + ApplicationAttemptId attId = + ApplicationAttemptId.newInstance(appIdImpl, attemptId); + return attId; + } +}