YARN-3079. Scheduler should also update maximumAllocation when updateNodeResource. (Zhihai Xu via wangda)
This commit is contained in:
parent
f37849188b
commit
7882bc0f14
|
@ -434,6 +434,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-3103. AMRMClientImpl does not update AMRM token properly. (Jason Lowe
|
||||
via jianhe)
|
||||
|
||||
YARN-3079. Scheduler should also update maximumAllocation when updateNodeResource.
|
||||
(Zhihai Xu via wangda)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.io.IOException;
|
|||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -82,8 +84,9 @@ public abstract class AbstractYarnScheduler
|
|||
private Resource configuredMaximumAllocation;
|
||||
private int maxNodeMemory = -1;
|
||||
private int maxNodeVCores = -1;
|
||||
private ReentrantReadWriteLock maximumAllocationLock =
|
||||
new ReentrantReadWriteLock();
|
||||
private final ReadLock maxAllocReadLock;
|
||||
private final WriteLock maxAllocWriteLock;
|
||||
|
||||
private boolean useConfiguredMaximumAllocationOnly = true;
|
||||
private long configuredMaximumAllocationWaitTime;
|
||||
|
||||
|
@ -103,6 +106,9 @@ public abstract class AbstractYarnScheduler
|
|||
*/
|
||||
public AbstractYarnScheduler(String name) {
|
||||
super(name);
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.maxAllocReadLock = lock.readLock();
|
||||
this.maxAllocWriteLock = lock.writeLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -157,8 +163,7 @@ public abstract class AbstractYarnScheduler
|
|||
@Override
|
||||
public Resource getMaximumResourceCapability() {
|
||||
Resource maxResource;
|
||||
ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
|
||||
readLock.lock();
|
||||
maxAllocReadLock.lock();
|
||||
try {
|
||||
if (useConfiguredMaximumAllocationOnly) {
|
||||
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
|
||||
|
@ -170,22 +175,20 @@ public abstract class AbstractYarnScheduler
|
|||
maxResource = Resources.clone(maximumAllocation);
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
maxAllocReadLock.unlock();
|
||||
}
|
||||
return maxResource;
|
||||
}
|
||||
|
||||
protected void initMaximumResourceCapability(Resource maximumAllocation) {
|
||||
ReentrantReadWriteLock.WriteLock writeLock =
|
||||
maximumAllocationLock.writeLock();
|
||||
writeLock.lock();
|
||||
maxAllocWriteLock.lock();
|
||||
try {
|
||||
if (this.configuredMaximumAllocation == null) {
|
||||
this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
|
||||
this.maximumAllocation = Resources.clone(maximumAllocation);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
maxAllocWriteLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -535,19 +538,24 @@ public abstract class AbstractYarnScheduler
|
|||
*/
|
||||
public synchronized void updateNodeResource(RMNode nm,
|
||||
ResourceOption resourceOption) {
|
||||
|
||||
SchedulerNode node = getSchedulerNode(nm.getNodeID());
|
||||
Resource newResource = resourceOption.getResource();
|
||||
Resource oldResource = node.getTotalResource();
|
||||
if(!oldResource.equals(newResource)) {
|
||||
// Log resource change
|
||||
LOG.info("Update resource on node: " + node.getNodeName()
|
||||
LOG.info("Update resource on node: " + node.getNodeName()
|
||||
+ " from: " + oldResource + ", to: "
|
||||
+ newResource);
|
||||
|
||||
nodes.remove(nm.getNodeID());
|
||||
updateMaximumAllocation(node, false);
|
||||
|
||||
// update resource to node
|
||||
node.setTotalResource(newResource);
|
||||
|
||||
|
||||
nodes.put(nm.getNodeID(), (N)node);
|
||||
updateMaximumAllocation(node, true);
|
||||
|
||||
// update resource to clusterResource
|
||||
Resources.subtractFrom(clusterResource, oldResource);
|
||||
Resources.addTo(clusterResource, newResource);
|
||||
|
@ -571,28 +579,27 @@ public abstract class AbstractYarnScheduler
|
|||
}
|
||||
|
||||
protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
|
||||
ReentrantReadWriteLock.WriteLock writeLock =
|
||||
maximumAllocationLock.writeLock();
|
||||
writeLock.lock();
|
||||
Resource totalResource = node.getTotalResource();
|
||||
maxAllocWriteLock.lock();
|
||||
try {
|
||||
if (add) { // added node
|
||||
int nodeMemory = node.getTotalResource().getMemory();
|
||||
int nodeMemory = totalResource.getMemory();
|
||||
if (nodeMemory > maxNodeMemory) {
|
||||
maxNodeMemory = nodeMemory;
|
||||
maximumAllocation.setMemory(Math.min(
|
||||
configuredMaximumAllocation.getMemory(), maxNodeMemory));
|
||||
}
|
||||
int nodeVCores = node.getTotalResource().getVirtualCores();
|
||||
int nodeVCores = totalResource.getVirtualCores();
|
||||
if (nodeVCores > maxNodeVCores) {
|
||||
maxNodeVCores = nodeVCores;
|
||||
maximumAllocation.setVirtualCores(Math.min(
|
||||
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
|
||||
}
|
||||
} else { // removed node
|
||||
if (maxNodeMemory == node.getTotalResource().getMemory()) {
|
||||
if (maxNodeMemory == totalResource.getMemory()) {
|
||||
maxNodeMemory = -1;
|
||||
}
|
||||
if (maxNodeVCores == node.getTotalResource().getVirtualCores()) {
|
||||
if (maxNodeVCores == totalResource.getVirtualCores()) {
|
||||
maxNodeVCores = -1;
|
||||
}
|
||||
// We only have to iterate through the nodes if the current max memory
|
||||
|
@ -625,7 +632,7 @@ public abstract class AbstractYarnScheduler
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
maxAllocWriteLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
|
@ -279,6 +280,67 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxAllocationAfterUpdateNodeResource() throws IOException {
|
||||
final int configuredMaxVCores = 20;
|
||||
final int configuredMaxMemory = 10 * 1024;
|
||||
Resource configuredMaximumResource = Resource.newInstance
|
||||
(configuredMaxMemory, configuredMaxVCores);
|
||||
|
||||
configureScheduler();
|
||||
YarnConfiguration conf = getConf();
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
configuredMaxVCores);
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
configuredMaxMemory);
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
0);
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
|
||||
.getResourceScheduler();
|
||||
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
|
||||
|
||||
Resource resource1 = Resource.newInstance(2048, 5);
|
||||
Resource resource2 = Resource.newInstance(4096, 10);
|
||||
Resource resource3 = Resource.newInstance(512, 1);
|
||||
Resource resource4 = Resource.newInstance(1024, 2);
|
||||
|
||||
RMNode node1 = MockNodes.newNodeInfo(
|
||||
0, resource1, 1, "127.0.0.2");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node1));
|
||||
RMNode node2 = MockNodes.newNodeInfo(
|
||||
0, resource3, 2, "127.0.0.3");
|
||||
scheduler.handle(new NodeAddedSchedulerEvent(node2));
|
||||
verifyMaximumResourceCapability(resource1, scheduler);
|
||||
|
||||
// increase node1 resource
|
||||
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
|
||||
resource2, 0));
|
||||
verifyMaximumResourceCapability(resource2, scheduler);
|
||||
|
||||
// decrease node1 resource
|
||||
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
|
||||
resource1, 0));
|
||||
verifyMaximumResourceCapability(resource1, scheduler);
|
||||
|
||||
// increase node2 resource
|
||||
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
|
||||
resource4, 0));
|
||||
verifyMaximumResourceCapability(resource1, scheduler);
|
||||
|
||||
// decrease node2 resource
|
||||
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
|
||||
resource3, 0));
|
||||
verifyMaximumResourceCapability(resource1, scheduler);
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyMaximumResourceCapability(
|
||||
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
|
||||
|
||||
|
|
Loading…
Reference in New Issue