From 88f022da245c5d34997494a822124d07f8a5f72f Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 22 Dec 2014 16:50:15 -0800 Subject: [PATCH] YARN-2920. Changed CapacityScheduler to kill containers on nodes where node labels are changed. Contributed by Wangda Tan (cherry picked from commit fdf042dfffa4d2474e3cac86cfb8fe9ee4648beb) (cherry picked from commit 411836b74c6c02c0b5aebbbce29c209d93db1de2) --- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 3 +- .../yarn/sls/scheduler/RMNodeWrapper.java | 3 +- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/ResourceManager.java | 1 + .../nodelabels/RMNodeLabelsManager.java | 23 +++ .../resourcemanager/rmnode/RMNodeImpl.java | 6 +- .../scheduler/SchedulerNode.java | 24 ++- .../scheduler/capacity/AbstractCSQueue.java | 5 + .../scheduler/capacity/CSQueue.java | 8 + .../scheduler/capacity/CapacityScheduler.java | 88 ++++++-- .../scheduler/capacity/LeafQueue.java | 26 +-- .../scheduler/capacity/ParentQueue.java | 24 ++- .../common/fica/FiCaSchedulerNode.java | 11 +- .../event/NodeLabelsUpdateSchedulerEvent.java | 37 ++++ .../scheduler/event/SchedulerEventType.java | 1 + .../server/resourcemanager/MockNodes.java | 3 +- .../TestCapacitySchedulerNodeLabelUpdate.java | 193 ++++++++++++++++++ 17 files changed, 414 insertions(+), 45 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index fdddcf4097c..ee6eb7b5555 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode .UpdatedContainerInfo; @@ -162,7 +163,7 @@ public class NodeInfo { @Override public Set getNodeLabels() { - return null; + return RMNodeLabelsManager.EMPTY_STRING_SET; } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 3b185ae928c..b64be1b61a2 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode .UpdatedContainerInfo; @@ -150,6 +151,6 @@ public class RMNodeWrapper implements RMNode { @Override public Set getNodeLabels() { - return null; + return RMNodeLabelsManager.EMPTY_STRING_SET; } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8244d61b0d1..fa1e1209f6f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -171,6 +171,9 @@ Release 2.6.1 - UNRELEASED YARN-3733. Fix DominantRC#compare() does not work as expected if cluster resource is empty. (Rohith Sharmaks via wangda) + YARN-2920. Changed CapacityScheduler to kill containers on nodes where + node labels are changed. (Wangda Tan via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index ea762c0d0ed..353851c99cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -430,6 +430,7 @@ public class ResourceManager extends CompositeService implements Recoverable { rmContext.setAMFinishingMonitor(amFinishingMonitor); RMNodeLabelsManager nlm = createNodeLabelManager(); + nlm.setRMContext(rmContext); addService(nlm); rmContext.setNodeLabelManager(nlm); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index ba1727c2c18..d9828e2a21a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -35,7 +35,10 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; @@ -57,6 +60,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { new ConcurrentHashMap(); protected AccessControlList adminAcl; + private RMContext rmContext = null; + @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); @@ -331,6 +336,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { return map; } + @SuppressWarnings("unchecked") private void updateResourceMappings(Map before, Map after) { // Get NMs in before only @@ -341,6 +347,10 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { for (Entry entry : after.entrySet()) { allNMs.addAll(entry.getValue().nms.keySet()); } + + // Map used to notify RM + Map> newNodeToLabelsMap = + new HashMap>(); // traverse all nms for (NodeId nodeId : allNMs) { @@ -379,6 +389,9 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { Node newNM; if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) { Set newLabels = getLabelsByNode(nodeId, after); + + newNodeToLabelsMap.put(nodeId, ImmutableSet.copyOf(newLabels)); + // no label in the past if (newLabels.isEmpty()) { // update labels @@ -405,6 +418,12 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { } } } + + // Notify RM + if (rmContext != null && rmContext.getDispatcher() != null) { + rmContext.getDispatcher().getEventHandler().handle( + new NodeLabelsUpdateSchedulerEvent(newNodeToLabelsMap)); + } } public Resource getResourceByLabel(String label, Resource clusterResource) { @@ -452,4 +471,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { } return false; } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b92f399f411..b57b7cc6a0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -860,9 +861,10 @@ public class RMNodeImpl implements RMNode, EventHandler { @Override public Set getNodeLabels() { - if (context.getNodeLabelManager() == null) { + RMNodeLabelsManager nlm = context.getNodeLabelManager(); + if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) { return CommonNodeLabelsManager.EMPTY_STRING_SET; } - return context.getNodeLabelManager().getLabelsOnNode(nodeId); + return nlm.getLabelsOnNode(nodeId); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index f4d8731a012..d1922ee0880 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,11 +34,14 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.ImmutableSet; + /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -61,8 +65,11 @@ public abstract class SchedulerNode { private final RMNode rmNode; private final String nodeName; - - public SchedulerNode(RMNode node, boolean usePortForNodeName) { + + private volatile Set labels = null; + + public SchedulerNode(RMNode node, boolean usePortForNodeName, + Set labels) { this.rmNode = node; this.availableResource = Resources.clone(node.getTotalCapability()); this.totalResourceCapability = Resources.clone(node.getTotalCapability()); @@ -71,6 +78,11 @@ public abstract class SchedulerNode { } else { nodeName = rmNode.getHostName(); } + this.labels = ImmutableSet.copyOf(labels); + } + + public SchedulerNode(RMNode node, boolean usePortForNodeName) { + this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); } public RMNode getRMNode() { @@ -275,4 +287,12 @@ public abstract class SchedulerNode { } allocateContainer(rmContainer); } + + public Set getLabels() { + return labels; + } + + public void updateLabels(Set labels) { + this.labels = labels; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index fc0fbb42387..1f6696ddd10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -447,4 +447,9 @@ public abstract class AbstractCSQueue implements CSQueue { public Map getACLs() { return acls; } + + @Private + public Resource getUsedResourceByLabel(String nodeLabel) { + return usedResourcesByNodeLabels.get(nodeLabel); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 6438d6c83b8..07a7e0e9a2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -143,6 +143,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { */ public Resource getUsedResources(); + /** + * Get the currently utilized resources which allocated at nodes with label + * specified + * + * @return used resources by the queue and it's children + */ + public Resource getUsedResourceByLabel(String nodeLabel); + /** * Get the current run-state of the queue * @return current run-state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 042c83ce61b..c49c498dd35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,15 +25,15 @@ import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.HashSet; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,12 +48,15 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -79,12 +82,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -93,6 +99,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -106,11 +113,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; - @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -966,6 +968,51 @@ public class CapacityScheduler extends updateNodeResource(nm, resourceOption); root.updateClusterResource(clusterResource); } + + /** + * Process node labels update on a node. + * + * TODO: Currently capacity scheduler will kill containers on a node when + * labels on the node changed. It is a simply solution to ensure guaranteed + * capacity on labels of queues. When YARN-2498 completed, we can let + * preemption policy to decide if such containers need to be killed or just + * keep them running. + */ + private synchronized void updateLabelsOnNode(NodeId nodeId, + Set newLabels) { + FiCaSchedulerNode node = nodes.get(nodeId); + if (null == node) { + return; + } + + // labels is same, we don't need do update + if (node.getLabels().size() == newLabels.size() + && node.getLabels().containsAll(newLabels)) { + return; + } + + // Kill running containers since label is changed + for (RMContainer rmContainer : node.getRunningContainers()) { + ContainerId containerId = rmContainer.getContainerId(); + completedContainer(rmContainer, + ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, + String.format( + "Container=%s killed since labels on the node=%s changed", + containerId.toString(), nodeId.toString()), + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + } + + // Unreserve container on this node + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + dropContainerReservation(reservedContainer); + } + + // Update node labels after we've done this + node.updateLabels(newLabels); + } private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() @@ -1049,6 +1096,19 @@ public class CapacityScheduler extends nodeResourceUpdatedEvent.getResourceOption()); } break; + case NODE_LABELS_UPDATE: + { + NodeLabelsUpdateSchedulerEvent labelUpdateEvent = + (NodeLabelsUpdateSchedulerEvent) event; + + for (Entry> entry : labelUpdateEvent + .getUpdatedNodeToLabels().entrySet()) { + NodeId id = entry.getKey(); + Set labels = entry.getValue(); + updateLabelsOnNode(id, labels); + } + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; @@ -1117,14 +1177,8 @@ public class CapacityScheduler extends } private synchronized void addNode(RMNode nodeManager) { - // update this node to node label manager - if (labelManager != null) { - labelManager.activateNode(nodeManager.getNodeID(), - nodeManager.getTotalCapability()); - } - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + usePortForNodeName, nodeManager.getNodeLabels())); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); @@ -1135,6 +1189,12 @@ public class CapacityScheduler extends if (scheduleAsynchronously && numNodes == 1) { asyncSchedulerThread.beginSchedule(); } + + // update this node to node label manager + if (labelManager != null) { + labelManager.activateNode(nodeManager.getNodeID(), + nodeManager.getTotalCapability()); + } } private synchronized void removeNode(RMNode nodeInfo) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 60b5a593798..509b0f26d53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -809,7 +809,7 @@ public class LeafQueue extends AbstractCSQueue { // if our queue cannot access this node, just return if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, - labelManager.getLabelsOnNode(node.getNodeID()))) { + node.getLabels())) { return NULL_ASSIGNMENT; } @@ -878,7 +878,7 @@ public class LeafQueue extends AbstractCSQueue { // Check queue max-capacity limit if (!canAssignToThisQueue(clusterResource, required, - labelManager.getLabelsOnNode(node.getNodeID()), application, true)) { + node.getLabels(), application, true)) { return NULL_ASSIGNMENT; } @@ -911,7 +911,7 @@ public class LeafQueue extends AbstractCSQueue { // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - labelManager.getLabelsOnNode(node.getNodeID())); + node.getLabels()); // Don't reset scheduling opportunities for non-local assignments // otherwise the app will be delayed for each non-local assignment. @@ -1561,7 +1561,7 @@ public class LeafQueue extends AbstractCSQueue { // check if the resource request can access the label if (!SchedulerUtils.checkNodeLabelExpression( - labelManager.getLabelsOnNode(node.getNodeID()), + node.getLabels(), request.getNodeLabelExpression())) { // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed @@ -1752,8 +1752,7 @@ public class LeafQueue extends AbstractCSQueue { // Book-keeping if (removed) { releaseResource(clusterResource, application, - container.getResource(), - labelManager.getLabelsOnNode(node.getNodeID())); + container.getResource(), node.getLabels()); LOG.info("completedContainer" + " container=" + container + " queue=" + this + @@ -1950,9 +1949,10 @@ public class LeafQueue extends AbstractCSQueue { } // Careful! Locking order is important! synchronized (this) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1989,9 +1989,10 @@ public class LeafQueue extends AbstractCSQueue { public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -2006,9 +2007,10 @@ public class LeafQueue extends AbstractCSQueue { public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer() - .getNodeId())); + .getResource(), node.getLabels()); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6ffaf4c32b7..fd598f2e81c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue { private final boolean rootQueue; final Comparator queueComparator; volatile int numApplications; + private final CapacitySchedulerContext scheduler; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -80,7 +81,7 @@ public class ParentQueue extends AbstractCSQueue { public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); - + this.scheduler = cs; this.queueComparator = cs.getQueueComparator(); this.rootQueue = (parent == null); @@ -420,10 +421,10 @@ public class ParentQueue extends AbstractCSQueue { Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); + Set nodeLabels = node.getLabels(); // if our queue cannot access this node, just return - if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, - labelManager.getLabelsOnNode(node.getNodeID()))) { + if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) { return assignment; } @@ -434,7 +435,6 @@ public class ParentQueue extends AbstractCSQueue { } boolean localNeedToUnreserve = false; - Set nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); // Are we over maximum-capacity for this queue? if (!canAssignToThisQueue(clusterResource, nodeLabels)) { @@ -641,7 +641,7 @@ public class ParentQueue extends AbstractCSQueue { // Book keeping synchronized (this) { super.releaseResource(clusterResource, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(node.getNodeID())); + .getResource(), node.getLabels()); LOG.info("completedContainer" + " queue=" + getQueueName() + @@ -703,9 +703,10 @@ public class ParentQueue extends AbstractCSQueue { } // Careful! Locking order is important! synchronized (this) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); @@ -730,9 +731,10 @@ public class ParentQueue extends AbstractCSQueue { public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), labelManager.getLabelsOnNode(rmContainer - .getContainer().getNodeId())); + .getResource(), node.getLabels()); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" @@ -748,9 +750,11 @@ public class ParentQueue extends AbstractCSQueue { public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { + FiCaSchedulerNode node = + scheduler.getNode(rmContainer.getContainer().getNodeId()); super.releaseResource(clusterResource, rmContainer.getContainer().getResource(), - labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId())); + node.getLabels()); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 5227aac195d..fe6db47d283 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -32,9 +34,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); + + public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, + Set nodeLabels) { + super(node, usePortForNodeName, nodeLabels); + } public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { - super(node, usePortForNodeName); + this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java new file mode 100644 index 00000000000..7723e25c6ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeId; + +public class NodeLabelsUpdateSchedulerEvent extends SchedulerEvent { + private Map> nodeToLabels; + + public NodeLabelsUpdateSchedulerEvent(Map> nodeToLabels) { + super(SchedulerEventType.NODE_LABELS_UPDATE); + this.nodeToLabels = nodeToLabels; + } + + public Map> getUpdatedNodeToLabels() { + return nodeToLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 062f831c4ca..13aecb3f2de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -25,6 +25,7 @@ public enum SchedulerEventType { NODE_REMOVED, NODE_UPDATE, NODE_RESOURCE_UPDATE, + NODE_LABELS_UPDATE, // Source: RMApp APP_ADDED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 228f2006fda..278c151ff66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -206,7 +207,7 @@ public class MockNodes { @Override public Set getNodeLabels() { - return null; + return RMNodeLabelsManager.EMPTY_STRING_SET; } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java new file mode 100644 index 00000000000..261fa013073 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestCapacitySchedulerNodeLabelUpdate { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new MemoryRMNodeLabelsManager(); + mgr.init(conf); + } + + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 100); + conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z")); + conf.setCapacityByLabel(A, "x", 100); + conf.setCapacityByLabel(A, "y", 100); + conf.setCapacityByLabel(A, "z", 100); + + return conf; + } + + private Set toSet(String... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + private void checkUsedResource(MockRM rm, String queueName, int memory) { + checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL); + } + + private void checkUsedResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = scheduler.getQueue(queueName); + Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory()); + } + + @Test (timeout = 30000) + public void testNodeUpdate() throws Exception { + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 8000); + MockNM nm2 = rm.registerNode("h2:1234", 8000); + MockNM nm3 = rm.registerNode("h3:1234", 8000); + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm.submitApp(GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3); + + // request a container. + am1.allocate("*", GB, 1, new ArrayList(), "x"); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // check used resource: + // queue-a used x=1G, ""=1G + checkUsedResource(rm, "a", 1024, "x"); + checkUsedResource(rm, "a", 1024); + + // change h1's label to z, container should be killed + mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("z"))); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.KILLED, 10 * 1000)); + + // check used resource: + // queue-a used x=0G, ""=1G ("" not changed) + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 1024); + + // request a container with label = y + am1.allocate("*", GB, 1, new ArrayList(), "y"); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + Assert.assertTrue(rm.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // check used resource: + // queue-a used y=1G, ""=1G + checkUsedResource(rm, "a", 1024, "y"); + checkUsedResource(rm, "a", 1024); + + // change h2's label to no label, container should be killed + mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0), + CommonNodeLabelsManager.EMPTY_STRING_SET)); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.KILLED, 10 * 1000)); + + // check used resource: + // queue-a used x=0G, y=0G, ""=1G ("" not changed) + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 0, "y"); + checkUsedResource(rm, "a", 1024); + + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + + // change h3's label to z, AM container should be killed + mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0), + toSet("z"))); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.KILLED, 10 * 1000)); + + // check used resource: + // queue-a used x=0G, y=0G, ""=1G ("" not changed) + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 0, "y"); + checkUsedResource(rm, "a", 0); + + rm.close(); + } +}