YARN-4082. Container shouldn't be killed when node's label updated. Contributed by Wangda Tan.

(cherry picked from commit bf669b6d9f)
This commit is contained in:
Varun Vasudev 2015-09-01 14:19:11 +05:30
parent 2345627ad3
commit 855e0f8b00
7 changed files with 317 additions and 59 deletions

View File

@ -752,6 +752,9 @@ Release 2.8.0 - UNRELEASED
YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id
has not been reset synchronously. (Jun Gong via rohithsharmaks)
YARN-4082. Container shouldn't be killed when node's label updated.
(Wangda Tan via vvasudev)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -543,6 +544,32 @@ public abstract class AbstractCSQueue implements CSQueue {
}
}
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
if (nodeLabel == null) {
nodeLabel = RMNodeLabelsManager.NO_LABEL;
}
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.incUsed(nodeLabel, resourceToInc);
if (null != parent) {
parent.incUsedResource(nodeLabel, resourceToInc, null);
}
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
if (nodeLabel == null) {
nodeLabel = RMNodeLabelsManager.NO_LABEL;
}
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.decUsed(nodeLabel, resourceToDec);
if (null != parent) {
parent.decUsedResource(nodeLabel, resourceToDec, null);
}
}
/**
* Return if the queue has pending resource on given nodePartition and
* schedulingMode.

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -287,4 +288,29 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* @return resourceUsage
*/
public ResourceUsage getQueueResourceUsage();
/**
* When partition of node updated, we will update queue's resource usage if it
* has container(s) running on that.
*/
public void incUsedResource(String nodePartition, Resource resourceToInc,
SchedulerApplicationAttempt application);
/**
* When partition of node updated, we will update queue's resource usage if it
* has container(s) running on that.
*/
public void decUsedResource(String nodePartition, Resource resourceToDec,
SchedulerApplicationAttempt application);
/**
* When an outstanding resource is fulfilled or canceled, calling this will
* decrease pending resource in a queue.
*
* @param nodeLabel
* asked by application
* @param resourceToDec
* new resource asked
*/
public void decPendingResource(String nodeLabel, Resource resourceToDec);
}

View File

@ -1040,12 +1040,6 @@ public class CapacityScheduler extends
/**
* Process node labels update on a node.
*
* TODO: Currently capacity scheduler will kill containers on a node when
* labels on the node changed. It is a simply solution to ensure guaranteed
* capacity on labels of queues. When YARN-2498 completed, we can let
* preemption policy to decide if such containers need to be killed or just
* keep them running.
*/
private synchronized void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) {
@ -1060,17 +1054,31 @@ public class CapacityScheduler extends
return;
}
// Kill running containers since label is changed
// Get new partition, we have only one partition per node
String newPartition;
if (newLabels.isEmpty()) {
newPartition = RMNodeLabelsManager.NO_LABEL;
} else {
newPartition = newLabels.iterator().next();
}
// old partition as well
String oldPartition = node.getPartition();
// Update resources of these containers
for (RMContainer rmContainer : node.getRunningContainers()) {
ContainerId containerId = rmContainer.getContainerId();
completedContainer(rmContainer,
ContainerStatus.newInstance(containerId,
ContainerState.COMPLETE,
String.format(
"Container=%s killed since labels on the node=%s changed",
containerId.toString(), nodeId.toString()),
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
FiCaSchedulerApp application =
getApplicationAttempt(rmContainer.getApplicationAttemptId());
if (null != application) {
application.nodePartitionUpdated(rmContainer, oldPartition,
newPartition);
} else {
LOG.warn("There's something wrong, some RMContainers running on"
+ " a node, but we cannot find SchedulerApplicationAttempt for it. Node="
+ node.getNodeID() + " applicationAttemptId="
+ rmContainer.getApplicationAttemptId());
continue;
}
}
// Unreserve container on this node

View File

@ -1262,6 +1262,22 @@ public class LeafQueue extends AbstractCSQueue {
}
}
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
resourceToInc);
super.incUsedResource(nodeLabel, resourceToInc, application);
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
resourceToDec);
super.decUsedResource(nodeLabel, resourceToDec, application);
}
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();

View File

@ -443,4 +443,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulingMode, currentResourceLimits, reservedContainer);
}
}
public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition,
String newPartition) {
Resource containerResource = rmContainer.getAllocatedResource();
this.attemptResourceUsage.decUsed(oldPartition, containerResource);
this.attemptResourceUsage.incUsed(newPartition, containerResource);
getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
}
}

View File

@ -19,22 +19,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.junit.Assert;
import org.junit.Before;
@ -97,8 +104,18 @@ public class TestCapacitySchedulerNodeLabelUpdate {
.getMemory());
}
private void checkUserUsedResource(MockRM rm, String queueName,
String userName, String partition, int memory) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
LeafQueue.User user = queue.getUser(userName);
Assert.assertEquals(memory,
user.getResourceUsage().getUsed(partition).getMemory());
}
@Test(timeout = 60000)
public void testResourceUsage() throws Exception {
public void testRequestContainerAfterNodePartitionUpdated()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
"z"));
@ -160,7 +177,8 @@ public class TestCapacitySchedulerNodeLabelUpdate {
}
@Test (timeout = 60000)
public void testNodeUpdate() throws Exception {
public void testResourceUsageWhenNodeUpdatesPartition()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
@ -183,8 +201,9 @@ public class TestCapacitySchedulerNodeLabelUpdate {
MockNM nm1 = rm.registerNode("h1:1234", 8000);
MockNM nm2 = rm.registerNode("h2:1234", 8000);
MockNM nm3 = rm.registerNode("h3:1234", 8000);
ContainerId containerId;
ContainerId containerId1;
ContainerId containerId2;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
@ -193,9 +212,9 @@ public class TestCapacitySchedulerNodeLabelUpdate {
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId,
containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId2,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
@ -203,55 +222,205 @@ public class TestCapacitySchedulerNodeLabelUpdate {
checkUsedResource(rm, "a", 1024, "x");
checkUsedResource(rm, "a", 1024);
// change h1's label to z, container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("z")));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
// check used resource:
// queue-a used x=0G, ""=1G ("" not changed)
// change h1's label to z
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("z"))));
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024, "z");
checkUsedResource(rm, "a", 1024);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 1024, "z");
checkUsedResource(rm, "root", 1024);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "z", 1024);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("x").getMemory());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getUsed("z").getMemory());
// request a container with label = y
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
// change h1's label to y
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("y"))));
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024, "y");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 1024);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 1024, "y");
checkUsedResource(rm, "root", 0, "z");
checkUsedResource(rm, "root", 1024);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "y", 1024);
checkUserUsedResource(rm, "a", "user", "z", 0);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("x").getMemory());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getUsed("y").getMemory());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("z").getMemory());
// change h1's label to no label
Set<String> emptyLabels = new HashSet<>();
Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
emptyLabels);
cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 2048);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 0, "y");
checkUsedResource(rm, "root", 0, "z");
checkUsedResource(rm, "root", 2048);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "y", 0);
checkUserUsedResource(rm, "a", "user", "z", 0);
checkUserUsedResource(rm, "a", "user", "", 2048);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("x").getMemory());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("y").getMemory());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("z").getMemory());
Assert.assertEquals(2048,
app.getAppAttemptResourceUsage().getUsed("").getMemory());
// Finish the two containers, we should see used resource becomes 0
cs.completedContainer(cs.getRMContainer(containerId2),
ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
cs.completedContainer(cs.getRMContainer(containerId1),
ContainerStatus.newInstance(containerId1, ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 0);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 0, "y");
checkUsedResource(rm, "root", 0, "z");
checkUsedResource(rm, "root", 0);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "y", 0);
checkUserUsedResource(rm, "a", "user", "z", 0);
checkUserUsedResource(rm, "a", "user", "", 0);
rm.close();
}
@Test (timeout = 60000)
public void testComplexResourceUsageWhenNodeUpdatesPartition()
throws Exception {
/*
* This test is similar to testResourceUsageWhenNodeUpdatesPartition, this
* will include multiple applications, multiple users and multiple
* containers running on a single node, size of each container is 1G
*
* Node 1
* ------
* App1-container3
* App2-container2
* App2-Container3
*
* Node 2
* ------
* App2-container1
* App1-container1
* App1-container2
*/
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 80000);
MockNM nm2 = rm.registerNode("h2:1234", 80000);
// app1
RMApp app1 = rm.submitApp(GB, "app", "u1", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
// c2 on n1, c3 on n2
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
ContainerId containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
am1.allocate("*", GB, 1, new ArrayList<ContainerId>());
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm2, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
// queue-a used y=1G, ""=1G
checkUsedResource(rm, "a", 1024, "y");
checkUsedResource(rm, "a", 1024);
// change h2's label to no label, container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
CommonNodeLabelsManager.EMPTY_STRING_SET));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
// check used resource:
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 1024);
// app2
RMApp app2 = rm.submitApp(GB, "app", "u2", null, "a");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// c2/c3 on n1
am2.allocate("*", GB, 2, new ArrayList<ContainerId>(), "x");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// change h3's label to z, AM container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
toSet("z")));
ContainerId.newContainerId(am2.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
// queue-a used x=1G, ""=1G
checkUsedResource(rm, "a", 3 * GB, "x");
checkUsedResource(rm, "a", 3 * GB);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp application1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp application2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// change h1's label to z
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("z"))));
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0);
checkUsedResource(rm, "a", 3 * GB, "z");
checkUsedResource(rm, "a", 3 * GB);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 3 * GB, "z");
checkUsedResource(rm, "root", 3 * GB);
checkUserUsedResource(rm, "a", "u1", "x", 0 * GB);
checkUserUsedResource(rm, "a", "u1", "z", 1 * GB);
checkUserUsedResource(rm, "a", "u1", "", 2 * GB);
checkUserUsedResource(rm, "a", "u2", "x", 0 * GB);
checkUserUsedResource(rm, "a", "u2", "z", 2 * GB);
checkUserUsedResource(rm, "a", "u2", "", 1 * GB);
Assert.assertEquals(0,
application1.getAppAttemptResourceUsage().getUsed("x").getMemory());
Assert.assertEquals(1 * GB,
application1.getAppAttemptResourceUsage().getUsed("z").getMemory());
Assert.assertEquals(2 * GB,
application1.getAppAttemptResourceUsage().getUsed("").getMemory());
Assert.assertEquals(0,
application2.getAppAttemptResourceUsage().getUsed("x").getMemory());
Assert.assertEquals(2 * GB,
application2.getAppAttemptResourceUsage().getUsed("z").getMemory());
Assert.assertEquals(1 * GB,
application2.getAppAttemptResourceUsage().getUsed("").getMemory());
rm.close();
}