YARN-957. Fixed a bug in CapacityScheduler because of which requests that need more than a node's total capability were incorrectly allocated on that node causing apps to hang. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1520187 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5540d77e2f
commit
1e513bfc68
|
@ -152,6 +152,10 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
YARN-1077. Fixed TestContainerLaunch test failure on Windows. (Chuan Liu via
|
YARN-1077. Fixed TestContainerLaunch test failure on Windows. (Chuan Liu via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
YARN-957. Fixed a bug in CapacityScheduler because of which requests that
|
||||||
|
need more than a node's total capability were incorrectly allocated on that
|
||||||
|
node causing apps to hang. (Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-22
|
Release 2.1.0-beta - 2013-08-22
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -67,4 +67,9 @@ public abstract class SchedulerNode {
|
||||||
*/
|
*/
|
||||||
public abstract int getNumContainers();
|
public abstract int getNumContainers();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get total resources on the node.
|
||||||
|
* @return total resources on the node.
|
||||||
|
*/
|
||||||
|
public abstract Resource getTotalResource();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1308,9 +1308,15 @@ public class LeafQueue implements CSQueue {
|
||||||
+ " request=" + request + " type=" + type);
|
+ " request=" + request + " type=" + type);
|
||||||
}
|
}
|
||||||
Resource capability = request.getCapability();
|
Resource capability = request.getCapability();
|
||||||
|
|
||||||
Resource available = node.getAvailableResource();
|
Resource available = node.getAvailableResource();
|
||||||
|
Resource totalResource = node.getTotalResource();
|
||||||
|
|
||||||
|
if (!Resources.fitsIn(capability, totalResource)) {
|
||||||
|
LOG.warn("Node : " + node.getNodeID()
|
||||||
|
+ " does not have sufficient resource for request : " + request
|
||||||
|
+ " node total capability : " + node.getTotalResource());
|
||||||
|
return Resources.none();
|
||||||
|
}
|
||||||
assert Resources.greaterThan(
|
assert Resources.greaterThan(
|
||||||
resourceCalculator, clusterResource, available, Resources.none());
|
resourceCalculator, clusterResource, available, Resources.none());
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
|
||||||
|
|
||||||
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
|
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
|
||||||
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
|
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
|
||||||
|
private Resource totalResourceCapability;
|
||||||
|
|
||||||
private volatile int numContainers;
|
private volatile int numContainers;
|
||||||
|
|
||||||
|
@ -65,6 +66,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
|
||||||
this.rmNode = node;
|
this.rmNode = node;
|
||||||
this.availableResource.setMemory(node.getTotalCapability().getMemory());
|
this.availableResource.setMemory(node.getTotalCapability().getMemory());
|
||||||
this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
|
this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
|
||||||
|
totalResourceCapability =
|
||||||
|
Resource.newInstance(node.getTotalCapability().getMemory(), node
|
||||||
|
.getTotalCapability().getVirtualCores());
|
||||||
if (usePortForNodeName) {
|
if (usePortForNodeName) {
|
||||||
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
|
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
|
||||||
} else {
|
} else {
|
||||||
|
@ -126,6 +130,11 @@ public class FiCaSchedulerNode extends SchedulerNode {
|
||||||
return this.usedResource;
|
return this.usedResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getTotalResource() {
|
||||||
|
return this.totalResourceCapability;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized boolean isValidContainer(Container c) {
|
private synchronized boolean isValidContainer(Container c) {
|
||||||
if (launchedContainers.containsKey(c.getId()))
|
if (launchedContainers.containsKey(c.getId()))
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
|
|
||||||
private Resource availableResource;
|
private Resource availableResource;
|
||||||
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
|
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
|
||||||
|
private Resource totalResourceCapability;
|
||||||
|
|
||||||
private volatile int numContainers;
|
private volatile int numContainers;
|
||||||
|
|
||||||
|
@ -68,6 +69,9 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
|
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
|
||||||
this.rmNode = node;
|
this.rmNode = node;
|
||||||
this.availableResource = Resources.clone(node.getTotalCapability());
|
this.availableResource = Resources.clone(node.getTotalCapability());
|
||||||
|
totalResourceCapability =
|
||||||
|
Resource.newInstance(node.getTotalCapability().getMemory(), node
|
||||||
|
.getTotalCapability().getVirtualCores());
|
||||||
if (usePortForNodeName) {
|
if (usePortForNodeName) {
|
||||||
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
|
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
|
||||||
} else {
|
} else {
|
||||||
|
@ -173,6 +177,11 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
Resources.subtractFrom(usedResource, resource);
|
Resources.subtractFrom(usedResource, resource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getTotalResource() {
|
||||||
|
return this.totalResourceCapability;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void deductAvailableResource(Resource resource) {
|
private synchronized void deductAvailableResource(Resource resource) {
|
||||||
if (resource == null) {
|
if (resource == null) {
|
||||||
LOG.error("Invalid deduction of null resource for "
|
LOG.error("Invalid deduction of null resource for "
|
||||||
|
|
|
@ -232,6 +232,14 @@ public class MockRM extends ResourceManager {
|
||||||
return nm;
|
return nm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MockNM registerNode(String nodeIdStr, int memory, int vCores)
|
||||||
|
throws Exception {
|
||||||
|
MockNM nm =
|
||||||
|
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService());
|
||||||
|
nm.registerNode();
|
||||||
|
return nm;
|
||||||
|
}
|
||||||
|
|
||||||
public void sendNodeStarted(MockNM nm) throws Exception {
|
public void sendNodeStarted(MockNM nm) throws Exception {
|
||||||
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
||||||
nm.getNodeId());
|
nm.getNodeId());
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestContainerAllocation {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||||
|
|
||||||
|
private final int GB = 1024;
|
||||||
|
|
||||||
|
@Test(timeout = 3000000)
|
||||||
|
public void testExcessReservationThanNodeManagerCapacity() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// Register node1
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4);
|
||||||
|
MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4);
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
// wait..
|
||||||
|
int waitCount = 20;
|
||||||
|
int size = rm.getRMContext().getRMNodes().size();
|
||||||
|
while ((size = rm.getRMContext().getRMNodes().size()) != 2
|
||||||
|
&& waitCount-- > 0) {
|
||||||
|
LOG.info("Waiting for node managers to register : " + size);
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(2, rm.getRMContext().getRMNodes().size());
|
||||||
|
// Submit an application
|
||||||
|
RMApp app1 = rm.submitApp(128);
|
||||||
|
|
||||||
|
// kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||||
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
|
LOG.info("sending container requests ");
|
||||||
|
am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
|
||||||
|
AllocateResponse alloc1Response = am1.schedule(); // send the request
|
||||||
|
|
||||||
|
// kick the scheduler
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
int waitCounter = 20;
|
||||||
|
LOG.info("heartbeating nm1");
|
||||||
|
while (alloc1Response.getAllocatedContainers().size() < 1
|
||||||
|
&& waitCounter-- > 0) {
|
||||||
|
LOG.info("Waiting for containers to be created for app 1...");
|
||||||
|
Thread.sleep(500);
|
||||||
|
alloc1Response = am1.schedule();
|
||||||
|
}
|
||||||
|
LOG.info("received container : "
|
||||||
|
+ alloc1Response.getAllocatedContainers().size());
|
||||||
|
|
||||||
|
// No container should be allocated.
|
||||||
|
// Internally it should not been reserved.
|
||||||
|
Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0);
|
||||||
|
|
||||||
|
LOG.info("heartbeating nm2");
|
||||||
|
waitCounter = 20;
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
while (alloc1Response.getAllocatedContainers().size() < 1
|
||||||
|
&& waitCounter-- > 0) {
|
||||||
|
LOG.info("Waiting for containers to be created for app 1...");
|
||||||
|
Thread.sleep(500);
|
||||||
|
alloc1Response = am1.schedule();
|
||||||
|
}
|
||||||
|
LOG.info("received container : "
|
||||||
|
+ alloc1Response.getAllocatedContainers().size());
|
||||||
|
Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1);
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue