YARN-897. Ensure child queues are ordered correctly to account for completed containers. Contributed by Djellel Eddine Difallah.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1505146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9ff01d6261
commit
5b99672658
|
@ -796,6 +796,9 @@ Release 2.1.0-beta - 2013-07-02
|
|||
YARN-814. Improving diagnostics when containers fail during launch due to
|
||||
various reasons like invalid env etc. (Jian He via vinodkv)
|
||||
|
||||
YARN-897. Ensure child queues are ordered correctly to account for
|
||||
completed containers. (Djellel Eddine Difallah via acmurthy)
|
||||
|
||||
Release 2.0.5-alpha - 06/06/2013
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -185,12 +185,13 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
* <code>null</code> if it was just a reservation
|
||||
* @param containerStatus <code>ContainerStatus</code> for the completed
|
||||
* container
|
||||
* @param childQueue <code>CSQueue</code> to reinsert in childQueues
|
||||
* @param event event to be sent to the container
|
||||
*/
|
||||
public void completedContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, FiCaSchedulerNode node,
|
||||
RMContainer container, ContainerStatus containerStatus,
|
||||
RMContainerEventType event);
|
||||
RMContainerEventType event, CSQueue childQueue);
|
||||
|
||||
/**
|
||||
* Get the number of applications in the queue.
|
||||
|
|
|
@ -673,7 +673,7 @@ public class CapacityScheduler
|
|||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
container.getId(),
|
||||
SchedulerUtils.UNRESERVED_CONTAINER),
|
||||
RMContainerEventType.RELEASED);
|
||||
RMContainerEventType.RELEASED, null);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -828,7 +828,7 @@ public class CapacityScheduler
|
|||
// Inform the queue
|
||||
LeafQueue queue = (LeafQueue)application.getQueue();
|
||||
queue.completedContainer(clusterResource, application, node,
|
||||
rmContainer, containerStatus, event);
|
||||
rmContainer, containerStatus, event, null);
|
||||
|
||||
LOG.info("Application " + applicationAttemptId +
|
||||
" released container " + container.getId() +
|
||||
|
|
|
@ -1407,7 +1407,7 @@ public class LeafQueue implements CSQueue {
|
|||
@Override
|
||||
public void completedContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
|
||||
ContainerStatus containerStatus, RMContainerEventType event) {
|
||||
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue) {
|
||||
if (application != null) {
|
||||
// Careful! Locking order is important!
|
||||
synchronized (this) {
|
||||
|
@ -1442,7 +1442,7 @@ public class LeafQueue implements CSQueue {
|
|||
" cluster=" + clusterResource);
|
||||
// Inform the parent queue
|
||||
getParent().completedContainer(clusterResource, application,
|
||||
node, rmContainer, null, event);
|
||||
node, rmContainer, null, event, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -655,7 +655,7 @@ public class ParentQueue implements CSQueue {
|
|||
assignment.getResource(), Resources.none())) {
|
||||
// Remove and re-insert to sort
|
||||
iter.remove();
|
||||
LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() +
|
||||
LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() +
|
||||
" stats: " + childQueue);
|
||||
childQueues.add(childQueue);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -685,7 +685,8 @@ public class ParentQueue implements CSQueue {
|
|||
@Override
|
||||
public void completedContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, FiCaSchedulerNode node,
|
||||
RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) {
|
||||
RMContainer rmContainer, ContainerStatus containerStatus,
|
||||
RMContainerEventType event, CSQueue completedChildQueue) {
|
||||
if (application != null) {
|
||||
// Careful! Locking order is important!
|
||||
// Book keeping
|
||||
|
@ -701,10 +702,24 @@ public class ParentQueue implements CSQueue {
|
|||
" cluster=" + clusterResource);
|
||||
}
|
||||
|
||||
// reinsert the updated queue
|
||||
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
|
||||
CSQueue csqueue = iter.next();
|
||||
if(csqueue.equals(completedChildQueue))
|
||||
{
|
||||
iter.remove();
|
||||
LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() +
|
||||
" stats: " + csqueue);
|
||||
childQueues.add(csqueue);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Inform the parent
|
||||
if (parent != null) {
|
||||
// complete my parent
|
||||
parent.completedContainer(clusterResource, application,
|
||||
node, rmContainer, null, event);
|
||||
node, rmContainer, null, event, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,406 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestChildQueueOrder {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestChildQueueOrder.class);
|
||||
|
||||
RMContext rmContext;
|
||||
YarnConfiguration conf;
|
||||
CapacitySchedulerConfiguration csConf;
|
||||
CapacitySchedulerContext csContext;
|
||||
|
||||
final static int GB = 1024;
|
||||
final static String DEFAULT_RACK = "/default";
|
||||
|
||||
private final ResourceCalculator resourceComparator =
|
||||
new DefaultResourceCalculator();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
rmContext = TestUtils.getMockRMContext();
|
||||
conf = new YarnConfiguration();
|
||||
csConf = new CapacitySchedulerConfiguration();
|
||||
|
||||
csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConf()).thenReturn(conf);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getMinimumResourceCapability()).thenReturn(
|
||||
Resources.createResource(GB, 1));
|
||||
when(csContext.getMaximumResourceCapability()).thenReturn(
|
||||
Resources.createResource(16*GB, 32));
|
||||
when(csContext.getClusterResources()).
|
||||
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
||||
when(csContext.getApplicationComparator()).
|
||||
thenReturn(CapacityScheduler.applicationComparator);
|
||||
when(csContext.getQueueComparator()).
|
||||
thenReturn(CapacityScheduler.queueComparator);
|
||||
when(csContext.getResourceCalculator()).
|
||||
thenReturn(resourceComparator);
|
||||
}
|
||||
|
||||
private FiCaSchedulerApp getMockApplication(int appId, String user) {
|
||||
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
|
||||
doReturn(user).when(application).getUser();
|
||||
doReturn(Resources.createResource(0, 0)).when(application).getHeadroom();
|
||||
return application;
|
||||
}
|
||||
|
||||
private void stubQueueAllocation(final CSQueue queue,
|
||||
final Resource clusterResource, final FiCaSchedulerNode node,
|
||||
final int allocation) {
|
||||
stubQueueAllocation(queue, clusterResource, node, allocation,
|
||||
NodeType.NODE_LOCAL);
|
||||
}
|
||||
|
||||
private void stubQueueAllocation(final CSQueue queue,
|
||||
final Resource clusterResource, final FiCaSchedulerNode node,
|
||||
final int allocation, final NodeType type) {
|
||||
|
||||
// Simulate the queue allocation
|
||||
doAnswer(new Answer<CSAssignment>() {
|
||||
@Override
|
||||
public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
|
||||
try {
|
||||
throw new Exception();
|
||||
} catch (Exception e) {
|
||||
LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() +
|
||||
" alloc=" + allocation + " node=" + node.getHostName());
|
||||
}
|
||||
final Resource allocatedResource = Resources.createResource(allocation);
|
||||
if (queue instanceof ParentQueue) {
|
||||
((ParentQueue)queue).allocateResource(clusterResource,
|
||||
allocatedResource);
|
||||
} else {
|
||||
FiCaSchedulerApp app1 = getMockApplication(0, "");
|
||||
((LeafQueue)queue).allocateResource(clusterResource, app1,
|
||||
allocatedResource);
|
||||
}
|
||||
|
||||
// Next call - nothing
|
||||
if (allocation > 0) {
|
||||
doReturn(new CSAssignment(Resources.none(), type)).
|
||||
when(queue).assignContainers(eq(clusterResource), eq(node));
|
||||
|
||||
// Mock the node's resource availability
|
||||
Resource available = node.getAvailableResource();
|
||||
doReturn(Resources.subtractFrom(available, allocatedResource)).
|
||||
when(node).getAvailableResource();
|
||||
}
|
||||
|
||||
return new CSAssignment(allocatedResource, type);
|
||||
}
|
||||
}).
|
||||
when(queue).assignContainers(eq(clusterResource), eq(node));
|
||||
doNothing().when(node).releaseContainer(any(Container.class));
|
||||
}
|
||||
|
||||
|
||||
private float computeQueueAbsoluteUsedCapacity(CSQueue queue,
|
||||
int expectedMemory, Resource clusterResource) {
|
||||
return (
|
||||
((float)expectedMemory / (float)clusterResource.getMemory())
|
||||
);
|
||||
}
|
||||
|
||||
private float computeQueueUsedCapacity(CSQueue queue,
|
||||
int expectedMemory, Resource clusterResource) {
|
||||
return (expectedMemory /
|
||||
(clusterResource.getMemory() * queue.getAbsoluteCapacity()));
|
||||
}
|
||||
|
||||
final static float DELTA = 0.0001f;
|
||||
private void verifyQueueMetrics(CSQueue queue,
|
||||
int expectedMemory, Resource clusterResource) {
|
||||
assertEquals(
|
||||
computeQueueAbsoluteUsedCapacity(queue, expectedMemory, clusterResource),
|
||||
queue.getAbsoluteUsedCapacity(),
|
||||
DELTA);
|
||||
assertEquals(
|
||||
computeQueueUsedCapacity(queue, expectedMemory, clusterResource),
|
||||
queue.getUsedCapacity(),
|
||||
DELTA);
|
||||
|
||||
}
|
||||
|
||||
private static final String A = "a";
|
||||
private static final String B = "b";
|
||||
private static final String C = "c";
|
||||
private static final String D = "d";
|
||||
|
||||
private void setupSortedQueues(CapacitySchedulerConfiguration conf) {
|
||||
|
||||
// Define queues
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
|
||||
|
||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
||||
conf.setCapacity(Q_A, 25);
|
||||
|
||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
||||
conf.setCapacity(Q_B, 25);
|
||||
|
||||
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
||||
conf.setCapacity(Q_C, 25);
|
||||
|
||||
final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
|
||||
conf.setCapacity(Q_D, 25);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSortedQueues() throws Exception {
|
||||
// Setup queue configs
|
||||
setupSortedQueues(csConf);
|
||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||
CSQueue root =
|
||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||
TestUtils.spyHook);
|
||||
|
||||
// Setup some nodes
|
||||
final int memoryPerNode = 10;
|
||||
final int coresPerNode = 16;
|
||||
final int numNodes = 1;
|
||||
|
||||
FiCaSchedulerNode node_0 =
|
||||
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
|
||||
doNothing().when(node_0).releaseContainer(any(Container.class));
|
||||
|
||||
final Resource clusterResource =
|
||||
Resources.createResource(numNodes * (memoryPerNode*GB),
|
||||
numNodes * coresPerNode);
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
// Start testing
|
||||
CSQueue a = queues.get(A);
|
||||
CSQueue b = queues.get(B);
|
||||
CSQueue c = queues.get(C);
|
||||
CSQueue d = queues.get(D);
|
||||
|
||||
final String user_0 = "user_0";
|
||||
|
||||
// Stub an App and its containerCompleted
|
||||
FiCaSchedulerApp app_0 = getMockApplication(0,user_0);
|
||||
doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
|
||||
any(ContainerStatus.class),any(RMContainerEventType.class));
|
||||
|
||||
//
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
ContainerAllocationExpirer expirer =
|
||||
mock(ContainerAllocationExpirer.class);
|
||||
DrainDispatcher drainDispatcher = new DrainDispatcher();
|
||||
EventHandler eventHandler = drainDispatcher.getEventHandler();
|
||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||
app_0.getApplicationId(), 1);
|
||||
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
||||
Container container=TestUtils.getMockContainer(containerId,
|
||||
node_0.getNodeID(), Resources.createResource(1*GB), priority);
|
||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
||||
node_0.getNodeID(), eventHandler, expirer);
|
||||
|
||||
// Assign {1,2,3,4} 1GB containers respectively to queues
|
||||
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
for(int i=0; i < 2; i++)
|
||||
{
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
}
|
||||
for(int i=0; i < 3; i++)
|
||||
{
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
}
|
||||
for(int i=0; i < 4; i++)
|
||||
{
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
}
|
||||
verifyQueueMetrics(a, 1*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 4*GB, clusterResource);
|
||||
LOG.info("status child-queues: " + ((ParentQueue)root).
|
||||
getChildQueuesToPrint());
|
||||
|
||||
//Release 3 x 1GB containers from D
|
||||
for(int i=0; i < 3;i++)
|
||||
{
|
||||
d.completedContainer(clusterResource, app_0, node_0,
|
||||
rmContainer, null, RMContainerEventType.KILL, null);
|
||||
}
|
||||
verifyQueueMetrics(a, 1*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 1*GB, clusterResource);
|
||||
//reset manually resources on node
|
||||
node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
|
||||
(memoryPerNode-1-2-3-1)*GB);
|
||||
LOG.info("status child-queues: " +
|
||||
((ParentQueue)root).getChildQueuesToPrint());
|
||||
|
||||
|
||||
// Assign 2 x 1GB Containers to A
|
||||
for(int i=0; i < 2; i++)
|
||||
{
|
||||
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
}
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 1*GB, clusterResource);
|
||||
LOG.info("status child-queues: " +
|
||||
((ParentQueue)root).getChildQueuesToPrint());
|
||||
|
||||
//Release 1GB Container from A
|
||||
a.completedContainer(clusterResource, app_0, node_0,
|
||||
rmContainer, null, RMContainerEventType.KILL, null);
|
||||
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 1*GB, clusterResource);
|
||||
//reset manually resources on node
|
||||
node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
|
||||
(memoryPerNode-2-2-3-1)*GB);
|
||||
LOG.info("status child-queues: " +
|
||||
((ParentQueue)root).getChildQueuesToPrint());
|
||||
|
||||
// Assign 1GB container to B
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 1*GB, clusterResource);
|
||||
LOG.info("status child-queues: " +
|
||||
((ParentQueue)root).getChildQueuesToPrint());
|
||||
|
||||
//Release 1GB container resources from B
|
||||
b.completedContainer(clusterResource, app_0, node_0,
|
||||
rmContainer, null, RMContainerEventType.KILL, null);
|
||||
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 1*GB, clusterResource);
|
||||
//reset manually resources on node
|
||||
node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0,
|
||||
(memoryPerNode-2-2-3-1)*GB);
|
||||
LOG.info("status child-queues: " +
|
||||
((ParentQueue)root).getChildQueuesToPrint());
|
||||
|
||||
// Assign 1GB container to A
|
||||
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 1*GB, clusterResource);
|
||||
LOG.info("status child-queues: " +
|
||||
((ParentQueue)root).getChildQueuesToPrint());
|
||||
|
||||
// Now do the real test, where B and D request a 1GB container
|
||||
// D should should get the next container if the order is correct
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
|
||||
root.assignContainers(clusterResource, node_0);
|
||||
InOrder allocationOrder = inOrder(d,b);
|
||||
allocationOrder.verify(d).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class));
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class));
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(d, 2*GB, clusterResource); //D got the container
|
||||
LOG.info("status child-queues: " +
|
||||
((ParentQueue)root).getChildQueuesToPrint());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
}
|
|
@ -227,7 +227,7 @@ public class TestLeafQueue {
|
|||
doNothing().when(parent).completedContainer(
|
||||
any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
|
||||
any(RMContainer.class), any(ContainerStatus.class),
|
||||
any(RMContainerEventType.class));
|
||||
any(RMContainerEventType.class), any(CSQueue.class));
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
@ -480,7 +480,7 @@ public class TestLeafQueue {
|
|||
// Release each container from app_0
|
||||
for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
||||
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
||||
null, RMContainerEventType.KILL);
|
||||
null, RMContainerEventType.KILL, null);
|
||||
}
|
||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -491,7 +491,7 @@ public class TestLeafQueue {
|
|||
// Release each container from app_1
|
||||
for (RMContainer rmContainer : app_1.getLiveContainers()) {
|
||||
a.completedContainer(clusterResource, app_1, node_0, rmContainer,
|
||||
null, RMContainerEventType.KILL);
|
||||
null, RMContainerEventType.KILL, null);
|
||||
}
|
||||
assertEquals(0*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -850,7 +850,7 @@ public class TestLeafQueue {
|
|||
// 8. Release each container from app_0
|
||||
for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
||||
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
||||
null, RMContainerEventType.KILL);
|
||||
null, RMContainerEventType.KILL, null);
|
||||
}
|
||||
assertEquals(5*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -861,7 +861,7 @@ public class TestLeafQueue {
|
|||
// 9. Release each container from app_2
|
||||
for (RMContainer rmContainer : app_2.getLiveContainers()) {
|
||||
a.completedContainer(clusterResource, app_2, node_0, rmContainer,
|
||||
null, RMContainerEventType.KILL);
|
||||
null, RMContainerEventType.KILL, null);
|
||||
}
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -872,7 +872,7 @@ public class TestLeafQueue {
|
|||
// 10. Release each container from app_3
|
||||
for (RMContainer rmContainer : app_3.getLiveContainers()) {
|
||||
a.completedContainer(clusterResource, app_3, node_0, rmContainer,
|
||||
null, RMContainerEventType.KILL);
|
||||
null, RMContainerEventType.KILL, null);
|
||||
}
|
||||
assertEquals(0*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -959,7 +959,8 @@ public class TestLeafQueue {
|
|||
|
||||
// Now free 1 container from app_0 i.e. 1G
|
||||
a.completedContainer(clusterResource, app_0, node_0,
|
||||
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
||||
app_0.getLiveContainers().iterator().next(),
|
||||
null, RMContainerEventType.KILL, null);
|
||||
a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(5*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -971,7 +972,8 @@ public class TestLeafQueue {
|
|||
|
||||
// Now finish another container from app_0 and fulfill the reservation
|
||||
a.completedContainer(clusterResource, app_0, node_0,
|
||||
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
||||
app_0.getLiveContainers().iterator().next(),
|
||||
null, RMContainerEventType.KILL, null);
|
||||
a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1069,7 +1071,8 @@ public class TestLeafQueue {
|
|||
|
||||
// Now free 1 container from app_0 and try to assign to node_0
|
||||
a.completedContainer(clusterResource, app_0, node_0,
|
||||
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
||||
app_0.getLiveContainers().iterator().next(),
|
||||
null, RMContainerEventType.KILL, null);
|
||||
a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(8*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1160,7 +1163,8 @@ public class TestLeafQueue {
|
|||
|
||||
// Now free 1 container from app_0 i.e. 1G, and re-reserve it
|
||||
a.completedContainer(clusterResource, app_0, node_0,
|
||||
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
||||
app_0.getLiveContainers().iterator().next(),
|
||||
null, RMContainerEventType.KILL, null);
|
||||
a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(5*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1191,7 +1195,8 @@ public class TestLeafQueue {
|
|||
|
||||
// Now finish another container from app_0 and see the reservation cancelled
|
||||
a.completedContainer(clusterResource, app_0, node_0,
|
||||
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
||||
app_0.getLiveContainers().iterator().next(),
|
||||
null, RMContainerEventType.KILL, null);
|
||||
CSAssignment assignment = a.assignContainers(clusterResource, node_0);
|
||||
assertEquals(8*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
|
Loading…
Reference in New Issue