YARN-2604. Scheduler should consider max-allocation-* in conjunction with the largest node. (Robert Kanter via kasha)

(cherry picked from commit 3114d4731d)
This commit is contained in:
Karthik Kambatla 2014-11-21 10:32:28 -08:00
parent 6570f7fd72
commit e9db0aa35c
9 changed files with 348 additions and 49 deletions

View File

@ -57,6 +57,9 @@ Release 2.7.0 - UNRELEASED
YARN-2375. Allow enabling/disabling timeline server per framework. YARN-2375. Allow enabling/disabling timeline server per framework.
(Mit Desai via jeagles) (Mit Desai via jeagles)
YARN-2604. Scheduler should consider max-allocation-* in conjunction
with the largest node. (Robert Kanter via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -77,7 +78,14 @@ public abstract class AbstractYarnScheduler
protected Resource clusterResource = Resource.newInstance(0, 0); protected Resource clusterResource = Resource.newInstance(0, 0);
protected Resource minimumAllocation; protected Resource minimumAllocation;
protected Resource maximumAllocation; private Resource maximumAllocation;
private Resource configuredMaximumAllocation;
private int maxNodeMemory = -1;
private int maxNodeVCores = -1;
private ReentrantReadWriteLock maximumAllocationLock =
new ReentrantReadWriteLock();
private boolean useConfiguredMaximumAllocationOnly = true;
private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext; protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication<T>> applications; protected Map<ApplicationId, SchedulerApplication<T>> applications;
@ -102,6 +110,9 @@ public abstract class AbstractYarnScheduler
nmExpireInterval = nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
createReleaseCache(); createReleaseCache();
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -145,7 +156,37 @@ public abstract class AbstractYarnScheduler
@Override @Override
public Resource getMaximumResourceCapability() { public Resource getMaximumResourceCapability() {
return maximumAllocation; Resource maxResource;
ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
readLock.lock();
try {
if (useConfiguredMaximumAllocationOnly) {
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
> configuredMaximumAllocationWaitTime) {
useConfiguredMaximumAllocationOnly = false;
}
maxResource = Resources.clone(configuredMaximumAllocation);
} else {
maxResource = Resources.clone(maximumAllocation);
}
} finally {
readLock.unlock();
}
return maxResource;
}
protected void initMaximumResourceCapability(Resource maximumAllocation) {
ReentrantReadWriteLock.WriteLock writeLock =
maximumAllocationLock.writeLock();
writeLock.lock();
try {
if (this.configuredMaximumAllocation == null) {
this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
this.maximumAllocation = Resources.clone(maximumAllocation);
}
} finally {
writeLock.unlock();
}
} }
protected void containerLaunchedOnNode(ContainerId containerId, protected void containerLaunchedOnNode(ContainerId containerId,
@ -528,4 +569,63 @@ public abstract class AbstractYarnScheduler
throw new YarnException(getClass().getSimpleName() throw new YarnException(getClass().getSimpleName()
+ " does not support reservations"); + " does not support reservations");
} }
protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
ReentrantReadWriteLock.WriteLock writeLock =
maximumAllocationLock.writeLock();
writeLock.lock();
try {
if (add) { // added node
int nodeMemory = node.getAvailableResource().getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
maximumAllocation.setMemory(Math.min(
configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
int nodeVCores = node.getAvailableResource().getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
maximumAllocation.setVirtualCores(Math.min(
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
} else { // removed node
if (maxNodeMemory == node.getAvailableResource().getMemory()) {
maxNodeMemory = -1;
}
if (maxNodeVCores == node.getAvailableResource().getVirtualCores()) {
maxNodeVCores = -1;
}
// We only have to iterate through the nodes if the current max memory
// or vcores was equal to the removed node's
if (maxNodeMemory == -1 || maxNodeVCores == -1) {
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
int nodeMemory =
nodeEntry.getValue().getAvailableResource().getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
}
int nodeVCores =
nodeEntry.getValue().getAvailableResource().getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
}
}
if (maxNodeMemory == -1) { // no nodes
maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
} else {
maximumAllocation.setMemory(
Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
if (maxNodeVCores == -1) { // no nodes
maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
} else {
maximumAllocation.setVirtualCores(
Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
}
}
} finally {
writeLock.unlock();
}
}
} }

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -284,7 +283,7 @@ public class CapacityScheduler extends
this.conf = loadCapacitySchedulerConfiguration(configuration); this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf); validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation(); this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation());
this.calculator = this.conf.getResourceCalculator(); this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = this.applications =
@ -321,8 +320,8 @@ public class CapacityScheduler extends
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf); Configuration configuration = new Configuration(conf);
initScheduler(configuration);
super.serviceInit(conf); super.serviceInit(conf);
initScheduler(configuration);
} }
@Override @Override
@ -849,7 +848,7 @@ public class CapacityScheduler extends
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests( SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(), ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), maximumAllocation); getMinimumResourceCapability(), getMaximumResourceCapability());
// Release containers // Release containers
releaseContainers(release, application); releaseContainers(release, application);
@ -1123,12 +1122,13 @@ public class CapacityScheduler extends
labelManager.activateNode(nodeManager.getNodeID(), labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability()); nodeManager.getTotalCapability());
} }
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, usePortForNodeName);
usePortForNodeName)); this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability()); Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource); root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet(); int numNodes = numNodeManagers.incrementAndGet();
updateMaximumAllocation(schedulerNode, true);
LOG.info("Added node " + nodeManager.getNodeAddress() + LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource); " clusterResource: " + clusterResource);
@ -1177,6 +1177,7 @@ public class CapacityScheduler extends
} }
this.nodes.remove(nodeInfo.getNodeID()); this.nodes.remove(nodeInfo.getNodeID());
updateMaximumAllocation(node, false);
LOG.info("Removed node " + nodeInfo.getNodeAddress() + LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource); " clusterResource: " + clusterResource);

View File

@ -817,9 +817,11 @@ public class FairScheduler extends
} }
private synchronized void addNode(RMNode node) { private synchronized void addNode(RMNode node) {
nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
nodes.put(node.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, node.getTotalCapability()); Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics(); updateRootQueueMetrics();
updateMaximumAllocation(schedulerNode, true);
queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares(); queueMgr.getRootQueue().recomputeSteadyShares();
@ -859,6 +861,7 @@ public class FairScheduler extends
nodes.remove(rmNode.getNodeID()); nodes.remove(rmNode.getNodeID());
queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares(); queueMgr.getRootQueue().recomputeSteadyShares();
updateMaximumAllocation(node, false);
LOG.info("Removed node " + rmNode.getNodeAddress() + LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource); " cluster capacity: " + clusterResource);
} }
@ -877,7 +880,8 @@ public class FairScheduler extends
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
clusterResource, minimumAllocation, maximumAllocation, incrAllocation); clusterResource, minimumAllocation, getMaximumResourceCapability(),
incrAllocation);
// Set amResource for this app // Set amResource for this app
if (!application.getUnmanagedAM() && ask.size() == 1 if (!application.getUnmanagedAM() && ask.size() == 1
@ -1225,7 +1229,7 @@ public class FairScheduler extends
this.conf = new FairSchedulerConfiguration(conf); this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf); validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation(); minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation());
incrAllocation = this.conf.getIncrementAllocation(); incrAllocation = this.conf.getIncrementAllocation();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs = continuousSchedulingSleepMs =

View File

@ -215,10 +215,13 @@ public class FifoScheduler extends
Resources.createResource(conf.getInt( Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
this.maximumAllocation = initMaximumResourceCapability(
Resources.createResource(conf.getInt( Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
this.usePortForNodeName = conf.getBoolean( this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
@ -303,7 +306,7 @@ public class FifoScheduler extends
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator, SchedulerUtils.normalizeRequests(ask, resourceCalculator,
clusterResource, minimumAllocation, maximumAllocation); clusterResource, minimumAllocation, getMaximumResourceCapability());
// Release containers // Release containers
releaseContainers(release, application); releaseContainers(release, application);
@ -899,6 +902,7 @@ public class FifoScheduler extends
//Remove the node //Remove the node
this.nodes.remove(nodeInfo.getNodeID()); this.nodes.remove(nodeInfo.getNodeID());
updateMaximumAllocation(node, false);
// Update cluster metrics // Update cluster metrics
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
@ -916,9 +920,11 @@ public class FifoScheduler extends
} }
private synchronized void addNode(RMNode nodeManager) { private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName)); usePortForNodeName);
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability()); Resources.addTo(clusterResource, nodeManager.getTotalCapability());
updateMaximumAllocation(schedulerNode, true);
} }
@Override @Override

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
public TestAbstractYarnScheduler(SchedulerType type) {
super(type);
}
@Test
public void testMaximimumAllocationMemory() throws Exception {
final int node1MaxMemory = 15 * 1024;
final int node2MaxMemory = 5 * 1024;
final int node3MaxMemory = 6 * 1024;
final int configuredMaxMemory = 10 * 1024;
configureScheduler();
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1000 * 1000);
MockRM rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationMemoryHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
} finally {
rm.stop();
}
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationMemoryHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
node2MaxMemory, node3MaxMemory, node2MaxMemory);
} finally {
rm.stop();
}
}
private void testMaximumAllocationMemoryHelper(
AbstractYarnScheduler scheduler,
final int node1MaxMemory, final int node2MaxMemory,
final int node3MaxMemory, final int... expectedMaxMemory)
throws Exception {
Assert.assertEquals(6, expectedMaxMemory.length);
Assert.assertEquals(0, scheduler.getNumClusterNodes());
int maxMemory = scheduler.getMaximumResourceCapability().getMemory();
Assert.assertEquals(expectedMaxMemory[0], maxMemory);
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(node1MaxMemory), 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
Assert.assertEquals(expectedMaxMemory[1], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
Assert.assertEquals(expectedMaxMemory[2], maxMemory);
RMNode node2 = MockNodes.newNodeInfo(
0, Resources.createResource(node2MaxMemory), 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
Assert.assertEquals(expectedMaxMemory[3], maxMemory);
RMNode node3 = MockNodes.newNodeInfo(
0, Resources.createResource(node3MaxMemory), 3, "127.0.0.4");
scheduler.handle(new NodeAddedSchedulerEvent(node3));
Assert.assertEquals(2, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
Assert.assertEquals(expectedMaxMemory[4], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemory();
Assert.assertEquals(expectedMaxMemory[5], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
}
@Test
public void testMaximimumAllocationVCores() throws Exception {
final int node1MaxVCores = 15;
final int node2MaxVCores = 5;
final int node3MaxVCores = 6;
final int configuredMaxVCores = 10;
configureScheduler();
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1000 * 1000);
MockRM rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationVCoresHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
} finally {
rm.stop();
}
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationVCoresHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
node2MaxVCores, node3MaxVCores, node2MaxVCores);
} finally {
rm.stop();
}
}
private void testMaximumAllocationVCoresHelper(
AbstractYarnScheduler scheduler,
final int node1MaxVCores, final int node2MaxVCores,
final int node3MaxVCores, final int... expectedMaxVCores)
throws Exception {
Assert.assertEquals(6, expectedMaxVCores.length);
Assert.assertEquals(0, scheduler.getNumClusterNodes());
int maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[0], maxVCores);
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node1MaxVCores), 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[1], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[2], maxVCores);
RMNode node2 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node2MaxVCores), 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[3], maxVCores);
RMNode node3 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node3MaxVCores), 3, "127.0.0.4");
scheduler.handle(new NodeAddedSchedulerEvent(node3));
Assert.assertEquals(2, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[4], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[5], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
}
}

View File

@ -116,7 +116,7 @@ public class TestContainerAllocation {
am1.registerAppAttempt(); am1.registerAppAttempt();
LOG.info("sending container requests "); LOG.info("sending container requests ");
am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1); am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler // kick the scheduler
@ -291,7 +291,7 @@ public class TestContainerAllocation {
// This is to test fetching AM container will be retried, if AM container is // This is to test fetching AM container will be retried, if AM container is
// not fetchable since DNS is unavailable causing container token/NMtoken // not fetchable since DNS is unavailable causing container token/NMtoken
// creation failure. // creation failure.
@Test(timeout = 20000) @Test(timeout = 30000)
public void testAMContainerAllocationWhenDNSUnavailable() throws Exception { public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
MockRM rm1 = new MockRM(conf) { MockRM rm1 = new MockRM(conf) {
@Override @Override

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@ -2590,37 +2591,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
} }
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus()); assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
} }
@Test
public void testReservationThatDoesntFit() throws IOException {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
"user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
createSchedulingRequestExistingApplication(1024, 2, attId);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals(1, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
}
@Test @Test
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
scheduler.init(conf); scheduler.init(conf);