YARN-3027. Scheduler should use totalAvailable resource from node instead of availableResource for maxAllocation. (adhoot via rkanter)

This commit is contained in:
Robert Kanter 2015-01-12 10:47:52 -08:00
parent 5b0d060d2a
commit ae7bf31fe1
3 changed files with 89 additions and 7 deletions

View File

@ -347,6 +347,9 @@ Release 2.7.0 - UNRELEASED
YARN-3014. Replaces labels on a host should update all NM's labels on that YARN-3014. Replaces labels on a host should update all NM's labels on that
host. (Wangda Tan via jianhe) host. (Wangda Tan via jianhe)
YARN-3027. Scheduler should use totalAvailable resource from node instead of
availableResource for maxAllocation. (adhoot via rkanter)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -576,23 +576,23 @@ public abstract class AbstractYarnScheduler
writeLock.lock(); writeLock.lock();
try { try {
if (add) { // added node if (add) { // added node
int nodeMemory = node.getAvailableResource().getMemory(); int nodeMemory = node.getTotalResource().getMemory();
if (nodeMemory > maxNodeMemory) { if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory; maxNodeMemory = nodeMemory;
maximumAllocation.setMemory(Math.min( maximumAllocation.setMemory(Math.min(
configuredMaximumAllocation.getMemory(), maxNodeMemory)); configuredMaximumAllocation.getMemory(), maxNodeMemory));
} }
int nodeVCores = node.getAvailableResource().getVirtualCores(); int nodeVCores = node.getTotalResource().getVirtualCores();
if (nodeVCores > maxNodeVCores) { if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores; maxNodeVCores = nodeVCores;
maximumAllocation.setVirtualCores(Math.min( maximumAllocation.setVirtualCores(Math.min(
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
} }
} else { // removed node } else { // removed node
if (maxNodeMemory == node.getAvailableResource().getMemory()) { if (maxNodeMemory == node.getTotalResource().getMemory()) {
maxNodeMemory = -1; maxNodeMemory = -1;
} }
if (maxNodeVCores == node.getAvailableResource().getVirtualCores()) { if (maxNodeVCores == node.getTotalResource().getVirtualCores()) {
maxNodeVCores = -1; maxNodeVCores = -1;
} }
// We only have to iterate through the nodes if the current max memory // We only have to iterate through the nodes if the current max memory
@ -600,12 +600,12 @@ public abstract class AbstractYarnScheduler
if (maxNodeMemory == -1 || maxNodeVCores == -1) { if (maxNodeMemory == -1 || maxNodeVCores == -1) {
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) { for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
int nodeMemory = int nodeMemory =
nodeEntry.getValue().getAvailableResource().getMemory(); nodeEntry.getValue().getTotalResource().getMemory();
if (nodeMemory > maxNodeMemory) { if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory; maxNodeMemory = nodeMemory;
} }
int nodeVCores = int nodeVCores =
nodeEntry.getValue().getAvailableResource().getVirtualCores(); nodeEntry.getValue().getTotalResource().getVirtualCores();
if (nodeVCores > maxNodeVCores) { if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores; maxNodeVCores = nodeVCores;
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -25,11 +27,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestB
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
public TestAbstractYarnScheduler(SchedulerType type) { public TestAbstractYarnScheduler(SchedulerType type) {
@ -210,4 +218,75 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
Assert.assertEquals(0, scheduler.getNumClusterNodes()); Assert.assertEquals(0, scheduler.getNumClusterNodes());
} }
@Test
public void testUpdateMaxAllocationUsesTotal() throws IOException {
final int configuredMaxVCores = 20;
final int configuredMaxMemory = 10 * 1024;
Resource configuredMaximumResource = Resource.newInstance
(configuredMaxMemory, configuredMaxVCores);
configureScheduler();
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
MockRM rm = new MockRM(conf);
try {
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
.getResourceScheduler();
Resource emptyResource = Resource.newInstance(0, 0);
Resource fullResource1 = Resource.newInstance(1024, 5);
Resource fullResource2 = Resource.newInstance(2048, 10);
SchedulerNode mockNode1 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080));
when(mockNode1.getAvailableResource()).thenReturn(emptyResource);
when(mockNode1.getTotalResource()).thenReturn(fullResource1);
SchedulerNode mockNode2 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081));
when(mockNode2.getAvailableResource()).thenReturn(emptyResource);
when(mockNode2.getTotalResource()).thenReturn(fullResource2);
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
scheduler.nodes = new HashMap<NodeId, SchedulerNode>();
scheduler.nodes.put(mockNode1.getNodeID(), mockNode1);
scheduler.updateMaximumAllocation(mockNode1, true);
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodes.put(mockNode2.getNodeID(), mockNode2);
scheduler.updateMaximumAllocation(mockNode2, true);
verifyMaximumResourceCapability(fullResource2, scheduler);
scheduler.nodes.remove(mockNode2.getNodeID());
scheduler.updateMaximumAllocation(mockNode2, false);
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodes.remove(mockNode1.getNodeID());
scheduler.updateMaximumAllocation(mockNode1, false);
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
} finally {
rm.stop();
}
}
private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
final Resource schedulerMaximumResourceCapability = scheduler
.getMaximumResourceCapability();
Assert.assertEquals(expectedMaximumResource.getMemory(),
schedulerMaximumResourceCapability.getMemory());
Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
schedulerMaximumResourceCapability.getVirtualCores());
}
} }