diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 19dd41879ae..6cde3c83ff2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -796,6 +796,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-814. Improving diagnostics when containers fail during launch due to
various reasons like invalid env etc. (Jian He via vinodkv)
+ YARN-897. Ensure child queues are ordered correctly to account for
+ completed containers. (Djellel Eddine Difallah via acmurthy)
+
Release 2.0.5-alpha - 06/06/2013
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index d21a888cf91..769b157bdf6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -185,12 +185,13 @@ public CSAssignment assignContainers(
* null
if it was just a reservation
* @param containerStatus ContainerStatus
for the completed
* container
+ * @param childQueue CSQueue
to reinsert in childQueues
* @param event event to be sent to the container
*/
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer container, ContainerStatus containerStatus,
- RMContainerEventType event);
+ RMContainerEventType event, CSQueue childQueue);
/**
* Get the number of applications in the queue.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 57ffc0c396f..29c4d4b9de3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -673,7 +673,7 @@ private synchronized void nodeUpdate(RMNode nm) {
SchedulerUtils.createAbnormalContainerStatus(
container.getId(),
SchedulerUtils.UNRESERVED_CONTAINER),
- RMContainerEventType.RELEASED);
+ RMContainerEventType.RELEASED, null);
}
}
@@ -828,7 +828,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(clusterResource, application, node,
- rmContainer, containerStatus, event);
+ rmContainer, containerStatus, event, null);
LOG.info("Application " + applicationAttemptId +
" released container " + container.getId() +
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 3d9ac4f3ce3..d8b89720d8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1407,7 +1407,7 @@ private boolean unreserve(FiCaSchedulerApp application, Priority priority,
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
- ContainerStatus containerStatus, RMContainerEventType event) {
+ ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue) {
if (application != null) {
// Careful! Locking order is important!
synchronized (this) {
@@ -1442,7 +1442,7 @@ public void completedContainer(Resource clusterResource,
" cluster=" + clusterResource);
// Inform the parent queue
getParent().completedContainer(clusterResource, application,
- node, rmContainer, null, event);
+ node, rmContainer, null, event, this);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 1a87984ef83..9a450069208 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -655,7 +655,7 @@ synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
assignment.getResource(), Resources.none())) {
// Remove and re-insert to sort
iter.remove();
- LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() +
+ LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() +
" stats: " + childQueue);
childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
@@ -685,7 +685,8 @@ void printChildQueues() {
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
- RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) {
+ RMContainer rmContainer, ContainerStatus containerStatus,
+ RMContainerEventType event, CSQueue completedChildQueue) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
@@ -701,10 +702,24 @@ public void completedContainer(Resource clusterResource,
" cluster=" + clusterResource);
}
+ // reinsert the updated queue
+ for (Iterator iter=childQueues.iterator(); iter.hasNext();) {
+ CSQueue csqueue = iter.next();
+ if(csqueue.equals(completedChildQueue))
+ {
+ iter.remove();
+ LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() +
+ " stats: " + csqueue);
+ childQueues.add(csqueue);
+ break;
+ }
+ }
+
// Inform the parent
if (parent != null) {
+ // complete my parent
parent.completedContainer(clusterResource, application,
- node, rmContainer, null, event);
+ node, rmContainer, null, event, this);
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
new file mode 100644
index 00000000000..014385c12eb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestChildQueueOrder {
+
+ private static final Log LOG = LogFactory.getLog(TestChildQueueOrder.class);
+
+ RMContext rmContext;
+ YarnConfiguration conf;
+ CapacitySchedulerConfiguration csConf;
+ CapacitySchedulerContext csContext;
+
+ final static int GB = 1024;
+ final static String DEFAULT_RACK = "/default";
+
+ private final ResourceCalculator resourceComparator =
+ new DefaultResourceCalculator();
+
+ @Before
+ public void setUp() throws Exception {
+ rmContext = TestUtils.getMockRMContext();
+ conf = new YarnConfiguration();
+ csConf = new CapacitySchedulerConfiguration();
+
+ csContext = mock(CapacitySchedulerContext.class);
+ when(csContext.getConf()).thenReturn(conf);
+ when(csContext.getConfiguration()).thenReturn(csConf);
+ when(csContext.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(GB, 1));
+ when(csContext.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(16*GB, 32));
+ when(csContext.getClusterResources()).
+ thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
+ when(csContext.getApplicationComparator()).
+ thenReturn(CapacityScheduler.applicationComparator);
+ when(csContext.getQueueComparator()).
+ thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getResourceCalculator()).
+ thenReturn(resourceComparator);
+ }
+
+ private FiCaSchedulerApp getMockApplication(int appId, String user) {
+ FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
+ doReturn(user).when(application).getUser();
+ doReturn(Resources.createResource(0, 0)).when(application).getHeadroom();
+ return application;
+ }
+
+ private void stubQueueAllocation(final CSQueue queue,
+ final Resource clusterResource, final FiCaSchedulerNode node,
+ final int allocation) {
+ stubQueueAllocation(queue, clusterResource, node, allocation,
+ NodeType.NODE_LOCAL);
+ }
+
+ private void stubQueueAllocation(final CSQueue queue,
+ final Resource clusterResource, final FiCaSchedulerNode node,
+ final int allocation, final NodeType type) {
+
+ // Simulate the queue allocation
+ doAnswer(new Answer() {
+ @Override
+ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
+ try {
+ throw new Exception();
+ } catch (Exception e) {
+ LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() +
+ " alloc=" + allocation + " node=" + node.getHostName());
+ }
+ final Resource allocatedResource = Resources.createResource(allocation);
+ if (queue instanceof ParentQueue) {
+ ((ParentQueue)queue).allocateResource(clusterResource,
+ allocatedResource);
+ } else {
+ FiCaSchedulerApp app1 = getMockApplication(0, "");
+ ((LeafQueue)queue).allocateResource(clusterResource, app1,
+ allocatedResource);
+ }
+
+ // Next call - nothing
+ if (allocation > 0) {
+ doReturn(new CSAssignment(Resources.none(), type)).
+ when(queue).assignContainers(eq(clusterResource), eq(node));
+
+ // Mock the node's resource availability
+ Resource available = node.getAvailableResource();
+ doReturn(Resources.subtractFrom(available, allocatedResource)).
+ when(node).getAvailableResource();
+ }
+
+ return new CSAssignment(allocatedResource, type);
+ }
+ }).
+ when(queue).assignContainers(eq(clusterResource), eq(node));
+ doNothing().when(node).releaseContainer(any(Container.class));
+ }
+
+
+ private float computeQueueAbsoluteUsedCapacity(CSQueue queue,
+ int expectedMemory, Resource clusterResource) {
+ return (
+ ((float)expectedMemory / (float)clusterResource.getMemory())
+ );
+ }
+
+ private float computeQueueUsedCapacity(CSQueue queue,
+ int expectedMemory, Resource clusterResource) {
+ return (expectedMemory /
+ (clusterResource.getMemory() * queue.getAbsoluteCapacity()));
+ }
+
+ final static float DELTA = 0.0001f;
+ private void verifyQueueMetrics(CSQueue queue,
+ int expectedMemory, Resource clusterResource) {
+ assertEquals(
+ computeQueueAbsoluteUsedCapacity(queue, expectedMemory, clusterResource),
+ queue.getAbsoluteUsedCapacity(),
+ DELTA);
+ assertEquals(
+ computeQueueUsedCapacity(queue, expectedMemory, clusterResource),
+ queue.getUsedCapacity(),
+ DELTA);
+
+ }
+
+ private static final String A = "a";
+ private static final String B = "b";
+ private static final String C = "c";
+ private static final String D = "d";
+
+ private void setupSortedQueues(CapacitySchedulerConfiguration conf) {
+
+ // Define queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
+
+ final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
+ conf.setCapacity(Q_A, 25);
+
+ final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
+ conf.setCapacity(Q_B, 25);
+
+ final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
+ conf.setCapacity(Q_C, 25);
+
+ final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
+ conf.setCapacity(Q_D, 25);
+ }
+
+ @Test
+ public void testSortedQueues() throws Exception {
+ // Setup queue configs
+ setupSortedQueues(csConf);
+ Map queues = new HashMap();
+ CSQueue root =
+ CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT, queues, queues,
+ TestUtils.spyHook);
+
+ // Setup some nodes
+ final int memoryPerNode = 10;
+ final int coresPerNode = 16;
+ final int numNodes = 1;
+
+ FiCaSchedulerNode node_0 =
+ TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
+ doNothing().when(node_0).releaseContainer(any(Container.class));
+
+ final Resource clusterResource =
+ Resources.createResource(numNodes * (memoryPerNode*GB),
+ numNodes * coresPerNode);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Start testing
+ CSQueue a = queues.get(A);
+ CSQueue b = queues.get(B);
+ CSQueue c = queues.get(C);
+ CSQueue d = queues.get(D);
+
+ final String user_0 = "user_0";
+
+ // Stub an App and its containerCompleted
+ FiCaSchedulerApp app_0 = getMockApplication(0,user_0);
+ doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
+ any(ContainerStatus.class),any(RMContainerEventType.class));
+
+ //
+ Priority priority = TestUtils.createMockPriority(1);
+ ContainerAllocationExpirer expirer =
+ mock(ContainerAllocationExpirer.class);
+ DrainDispatcher drainDispatcher = new DrainDispatcher();
+ EventHandler eventHandler = drainDispatcher.getEventHandler();
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ app_0.getApplicationId(), 1);
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+ Container container=TestUtils.getMockContainer(containerId,
+ node_0.getNodeID(), Resources.createResource(1*GB), priority);
+ RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+ node_0.getNodeID(), eventHandler, expirer);
+
+ // Assign {1,2,3,4} 1GB containers respectively to queues
+ stubQueueAllocation(a, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+ root.assignContainers(clusterResource, node_0);
+ for(int i=0; i < 2; i++)
+ {
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+ root.assignContainers(clusterResource, node_0);
+ }
+ for(int i=0; i < 3; i++)
+ {
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+ root.assignContainers(clusterResource, node_0);
+ }
+ for(int i=0; i < 4; i++)
+ {
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 1*GB);
+ root.assignContainers(clusterResource, node_0);
+ }
+ verifyQueueMetrics(a, 1*GB, clusterResource);
+ verifyQueueMetrics(b, 2*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 4*GB, clusterResource);
+ LOG.info("status child-queues: " + ((ParentQueue)root).
+ getChildQueuesToPrint());
+
+ //Release 3 x 1GB containers from D
+ for(int i=0; i < 3;i++)
+ {
+ d.completedContainer(clusterResource, app_0, node_0,
+ rmContainer, null, RMContainerEventType.KILL, null);
+ }
+ verifyQueueMetrics(a, 1*GB, clusterResource);
+ verifyQueueMetrics(b, 2*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 1*GB, clusterResource);
+ //reset manually resources on node
+ node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
+ (memoryPerNode-1-2-3-1)*GB);
+ LOG.info("status child-queues: " +
+ ((ParentQueue)root).getChildQueuesToPrint());
+
+
+ // Assign 2 x 1GB Containers to A
+ for(int i=0; i < 2; i++)
+ {
+ stubQueueAllocation(a, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+ root.assignContainers(clusterResource, node_0);
+ }
+ verifyQueueMetrics(a, 3*GB, clusterResource);
+ verifyQueueMetrics(b, 2*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 1*GB, clusterResource);
+ LOG.info("status child-queues: " +
+ ((ParentQueue)root).getChildQueuesToPrint());
+
+ //Release 1GB Container from A
+ a.completedContainer(clusterResource, app_0, node_0,
+ rmContainer, null, RMContainerEventType.KILL, null);
+ verifyQueueMetrics(a, 2*GB, clusterResource);
+ verifyQueueMetrics(b, 2*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 1*GB, clusterResource);
+ //reset manually resources on node
+ node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
+ (memoryPerNode-2-2-3-1)*GB);
+ LOG.info("status child-queues: " +
+ ((ParentQueue)root).getChildQueuesToPrint());
+
+ // Assign 1GB container to B
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+ root.assignContainers(clusterResource, node_0);
+ verifyQueueMetrics(a, 2*GB, clusterResource);
+ verifyQueueMetrics(b, 3*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 1*GB, clusterResource);
+ LOG.info("status child-queues: " +
+ ((ParentQueue)root).getChildQueuesToPrint());
+
+ //Release 1GB container resources from B
+ b.completedContainer(clusterResource, app_0, node_0,
+ rmContainer, null, RMContainerEventType.KILL, null);
+ verifyQueueMetrics(a, 2*GB, clusterResource);
+ verifyQueueMetrics(b, 2*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 1*GB, clusterResource);
+ //reset manually resources on node
+ node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
+ (memoryPerNode-2-2-3-1)*GB);
+ LOG.info("status child-queues: " +
+ ((ParentQueue)root).getChildQueuesToPrint());
+
+ // Assign 1GB container to A
+ stubQueueAllocation(a, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+ root.assignContainers(clusterResource, node_0);
+ verifyQueueMetrics(a, 3*GB, clusterResource);
+ verifyQueueMetrics(b, 2*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 1*GB, clusterResource);
+ LOG.info("status child-queues: " +
+ ((ParentQueue)root).getChildQueuesToPrint());
+
+ // Now do the real test, where B and D request a 1GB container
+ // D should should get the next container if the order is correct
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 1*GB);
+ root.assignContainers(clusterResource, node_0);
+ InOrder allocationOrder = inOrder(d,b);
+ allocationOrder.verify(d).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class));
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class));
+ verifyQueueMetrics(a, 3*GB, clusterResource);
+ verifyQueueMetrics(b, 2*GB, clusterResource);
+ verifyQueueMetrics(c, 3*GB, clusterResource);
+ verifyQueueMetrics(d, 2*GB, clusterResource); //D got the container
+ LOG.info("status child-queues: " +
+ ((ParentQueue)root).getChildQueuesToPrint());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 9779f154ffb..2569df3d17b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -227,7 +227,7 @@ public Container answer(InvocationOnMock invocation)
doNothing().when(parent).completedContainer(
any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
- any(RMContainerEventType.class));
+ any(RMContainerEventType.class), any(CSQueue.class));
return queue;
}
@@ -480,7 +480,7 @@ public void testSingleQueueWithOneUser() throws Exception {
// Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
- null, RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL, null);
}
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -491,7 +491,7 @@ public void testSingleQueueWithOneUser() throws Exception {
// Release each container from app_1
for (RMContainer rmContainer : app_1.getLiveContainers()) {
a.completedContainer(clusterResource, app_1, node_0, rmContainer,
- null, RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL, null);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -850,7 +850,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// 8. Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
- null, RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL, null);
}
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -861,7 +861,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// 9. Release each container from app_2
for (RMContainer rmContainer : app_2.getLiveContainers()) {
a.completedContainer(clusterResource, app_2, node_0, rmContainer,
- null, RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL, null);
}
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -872,7 +872,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// 10. Release each container from app_3
for (RMContainer rmContainer : app_3.getLiveContainers()) {
a.completedContainer(clusterResource, app_3, node_0, rmContainer,
- null, RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL, null);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -959,7 +959,8 @@ public void testReservation() throws Exception {
// Now free 1 container from app_0 i.e. 1G
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(),
+ null, RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -971,7 +972,8 @@ public void testReservation() throws Exception {
// Now finish another container from app_0 and fulfill the reservation
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(),
+ null, RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1069,7 +1071,8 @@ public void testStolenReservedContainer() throws Exception {
// Now free 1 container from app_0 and try to assign to node_0
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(),
+ null, RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1160,7 +1163,8 @@ public void testReservationExchange() throws Exception {
// Now free 1 container from app_0 i.e. 1G, and re-reserve it
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(),
+ null, RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1191,7 +1195,8 @@ public void testReservationExchange() throws Exception {
// Now finish another container from app_0 and see the reservation cancelled
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(),
+ null, RMContainerEventType.KILL, null);
CSAssignment assignment = a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());