YARN-2920. Changed CapacityScheduler to kill containers on nodes where node labels are changed. Contributed by Wangda Tan

This commit is contained in:
Jian He 2014-12-22 16:50:15 -08:00
parent 2cf90a2c33
commit fdf042dfff
17 changed files with 410 additions and 37 deletions

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
@ -162,7 +163,7 @@ public class NodeInfo {
@Override
public Set<String> getNodeLabels() {
return null;
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
@ -150,6 +151,6 @@ public class RMNodeWrapper implements RMNode {
@Override
public Set<String> getNodeLabels() {
return null;
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
}

View File

@ -263,6 +263,9 @@ Release 2.7.0 - UNRELEASED
YARN-2939. Fix new findbugs warnings in hadoop-yarn-common. (Li Lu via junping_du)
YARN-2920. Changed CapacityScheduler to kill containers on nodes where
node labels are changed. (Wangda Tan via jianhe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -426,6 +426,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setAMFinishingMonitor(amFinishingMonitor);
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm);

View File

@ -35,7 +35,10 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
@ -57,6 +60,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
new ConcurrentHashMap<String, Queue>();
protected AccessControlList adminAcl;
private RMContext rmContext = null;
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
@ -331,6 +336,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
return map;
}
@SuppressWarnings("unchecked")
private void updateResourceMappings(Map<String, Host> before,
Map<String, Host> after) {
// Get NMs in before only
@ -341,6 +347,10 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
for (Entry<String, Host> entry : after.entrySet()) {
allNMs.addAll(entry.getValue().nms.keySet());
}
// Map used to notify RM
Map<NodeId, Set<String>> newNodeToLabelsMap =
new HashMap<NodeId, Set<String>>();
// traverse all nms
for (NodeId nodeId : allNMs) {
@ -379,6 +389,9 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
Node newNM;
if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
Set<String> newLabels = getLabelsByNode(nodeId, after);
newNodeToLabelsMap.put(nodeId, ImmutableSet.copyOf(newLabels));
// no label in the past
if (newLabels.isEmpty()) {
// update labels
@ -405,6 +418,12 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
}
}
// Notify RM
if (rmContext != null && rmContext.getDispatcher() != null) {
rmContext.getDispatcher().getEventHandler().handle(
new NodeLabelsUpdateSchedulerEvent(newNodeToLabelsMap));
}
}
public Resource getResourceByLabel(String label, Resource clusterResource) {
@ -452,4 +471,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
return false;
}
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -858,9 +859,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public Set<String> getNodeLabels() {
if (context.getNodeLabelManager() == null) {
RMNodeLabelsManager nlm = context.getNodeLabelManager();
if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) {
return CommonNodeLabelsManager.EMPTY_STRING_SET;
}
return context.getNodeLabelManager().getLabelsOnNode(nodeId);
return nlm.getLabelsOnNode(nodeId);
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,11 +34,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@ -61,8 +65,11 @@ public abstract class SchedulerNode {
private final RMNode rmNode;
private final String nodeName;
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
private volatile Set<String> labels = null;
public SchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> labels) {
this.rmNode = node;
this.availableResource = Resources.clone(node.getTotalCapability());
this.totalResourceCapability = Resources.clone(node.getTotalCapability());
@ -71,6 +78,11 @@ public abstract class SchedulerNode {
} else {
nodeName = rmNode.getHostName();
}
this.labels = ImmutableSet.copyOf(labels);
}
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
}
public RMNode getRMNode() {
@ -274,4 +286,12 @@ public abstract class SchedulerNode {
}
allocateContainer(rmContainer);
}
public Set<String> getLabels() {
return labels;
}
public void updateLabels(Set<String> labels) {
this.labels = labels;
}
}

View File

@ -447,4 +447,9 @@ public abstract class AbstractCSQueue implements CSQueue {
public Map<QueueACL, AccessControlList> getACLs() {
return acls;
}
@Private
public Resource getUsedResourceByLabel(String nodeLabel) {
return usedResourcesByNodeLabels.get(nodeLabel);
}
}

View File

@ -143,6 +143,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public Resource getUsedResources();
/**
* Get the currently utilized resources which allocated at nodes with label
* specified
*
* @return used resources by the queue and it's children
*/
public Resource getUsedResourceByLabel(String nodeLabel);
/**
* Get the current run-state of the queue
* @return current run-state

View File

@ -21,12 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -47,7 +49,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -92,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -966,6 +971,51 @@ public class CapacityScheduler extends
updateNodeResource(nm, resourceOption);
root.updateClusterResource(clusterResource);
}
/**
* Process node labels update on a node.
*
* TODO: Currently capacity scheduler will kill containers on a node when
* labels on the node changed. It is a simply solution to ensure guaranteed
* capacity on labels of queues. When YARN-2498 completed, we can let
* preemption policy to decide if such containers need to be killed or just
* keep them running.
*/
private synchronized void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) {
FiCaSchedulerNode node = nodes.get(nodeId);
if (null == node) {
return;
}
// labels is same, we don't need do update
if (node.getLabels().size() == newLabels.size()
&& node.getLabels().containsAll(newLabels)) {
return;
}
// Kill running containers since label is changed
for (RMContainer rmContainer : node.getRunningContainers()) {
ContainerId containerId = rmContainer.getContainerId();
completedContainer(rmContainer,
ContainerStatus.newInstance(containerId,
ContainerState.COMPLETE,
String.format(
"Container=%s killed since labels on the node=%s changed",
containerId.toString(), nodeId.toString()),
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
}
// Unreserve container on this node
RMContainer reservedContainer = node.getReservedContainer();
if (null != reservedContainer) {
dropContainerReservation(reservedContainer);
}
// Update node labels after we've done this
node.updateLabels(newLabels);
}
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
@ -1049,6 +1099,19 @@ public class CapacityScheduler extends
nodeResourceUpdatedEvent.getResourceOption());
}
break;
case NODE_LABELS_UPDATE:
{
NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
(NodeLabelsUpdateSchedulerEvent) event;
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
.getUpdatedNodeToLabels().entrySet()) {
NodeId id = entry.getKey();
Set<String> labels = entry.getValue();
updateLabelsOnNode(id, labels);
}
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@ -1117,13 +1180,8 @@ public class CapacityScheduler extends
}
private synchronized void addNode(RMNode nodeManager) {
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName);
usePortForNodeName, nodeManager.getNodeLabels());
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
@ -1136,6 +1194,12 @@ public class CapacityScheduler extends
if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
}
private synchronized void removeNode(RMNode nodeInfo) {

View File

@ -730,7 +730,7 @@ public class LeafQueue extends AbstractCSQueue {
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
node.getLabels())) {
return NULL_ASSIGNMENT;
}
@ -799,7 +799,7 @@ public class LeafQueue extends AbstractCSQueue {
// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
node.getLabels(), application, true)) {
return NULL_ASSIGNMENT;
}
@ -832,7 +832,7 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
labelManager.getLabelsOnNode(node.getNodeID()));
node.getLabels());
// Don't reset scheduling opportunities for non-local assignments
// otherwise the app will be delayed for each non-local assignment.
@ -1478,7 +1478,7 @@ public class LeafQueue extends AbstractCSQueue {
// check if the resource request can access the label
if (!SchedulerUtils.checkNodeLabelExpression(
labelManager.getLabelsOnNode(node.getNodeID()),
node.getLabels(),
request.getNodeLabelExpression())) {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
@ -1669,8 +1669,7 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
if (removed) {
releaseResource(clusterResource, application,
container.getResource(),
labelManager.getLabelsOnNode(node.getNodeID()));
container.getResource(), node.getLabels());
LOG.info("completedContainer" +
" container=" + container +
" queue=" + this +
@ -1862,9 +1861,10 @@ public class LeafQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@ -1901,9 +1901,10 @@ public class LeafQueue extends AbstractCSQueue {
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@ -1918,9 +1919,10 @@ public class LeafQueue extends AbstractCSQueue {
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
.getNodeId()));
.getResource(), node.getLabels());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()

View File

@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue {
private final boolean rootQueue;
final Comparator<CSQueue> queueComparator;
volatile int numApplications;
private final CapacitySchedulerContext scheduler;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -80,7 +81,7 @@ public class ParentQueue extends AbstractCSQueue {
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.queueComparator = cs.getQueueComparator();
this.rootQueue = (parent == null);
@ -420,10 +421,10 @@ public class ParentQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
Set<String> nodeLabels = node.getLabels();
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
return assignment;
}
@ -434,7 +435,6 @@ public class ParentQueue extends AbstractCSQueue {
}
boolean localNeedToUnreserve = false;
Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID());
// Are we over maximum-capacity for this queue?
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
@ -641,7 +641,7 @@ public class ParentQueue extends AbstractCSQueue {
// Book keeping
synchronized (this) {
super.releaseResource(clusterResource, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
.getResource(), node.getLabels());
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@ -703,9 +703,10 @@ public class ParentQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
@ -730,9 +731,10 @@ public class ParentQueue extends AbstractCSQueue {
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), labelManager.getLabelsOnNode(rmContainer
.getContainer().getNodeId()));
.getResource(), node.getLabels());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@ -748,9 +750,11 @@ public class ParentQueue extends AbstractCSQueue {
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
node.getLabels());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="

View File

@ -19,12 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -32,9 +34,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> nodeLabels) {
super(node, usePortForNodeName, nodeLabels);
}
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
}
@Override

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
public class NodeLabelsUpdateSchedulerEvent extends SchedulerEvent {
private Map<NodeId, Set<String>> nodeToLabels;
public NodeLabelsUpdateSchedulerEvent(Map<NodeId, Set<String>> nodeToLabels) {
super(SchedulerEventType.NODE_LABELS_UPDATE);
this.nodeToLabels = nodeToLabels;
}
public Map<NodeId, Set<String>> getUpdatedNodeToLabels() {
return nodeToLabels;
}
}

View File

@ -25,6 +25,7 @@ public enum SchedulerEventType {
NODE_REMOVED,
NODE_UPDATE,
NODE_RESOURCE_UPDATE,
NODE_LABELS_UPDATE,
// Source: RMApp
APP_ADDED,

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -206,7 +207,7 @@ public class MockNodes {
@Override
public Set<String> getNodeLabels() {
return null;
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
};

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestCapacitySchedulerNodeLabelUpdate {
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new MemoryRMNodeLabelsManager();
mgr.init(conf);
}
private Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 100);
conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
conf.setCapacityByLabel(A, "x", 100);
conf.setCapacityByLabel(A, "y", 100);
conf.setCapacityByLabel(A, "z", 100);
return conf;
}
private Set<String> toSet(String... elements) {
Set<String> set = Sets.newHashSet(elements);
return set;
}
private void checkUsedResource(MockRM rm, String queueName, int memory) {
checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
}
private void checkUsedResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory());
}
@Test (timeout = 30000)
public void testNodeUpdate() throws Exception {
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 8000);
MockNM nm2 = rm.registerNode("h2:1234", 8000);
MockNM nm3 = rm.registerNode("h3:1234", 8000);
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
// queue-a used x=1G, ""=1G
checkUsedResource(rm, "a", 1024, "x");
checkUsedResource(rm, "a", 1024);
// change h1's label to z, container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("z")));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
// check used resource:
// queue-a used x=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024);
// request a container with label = y
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm2, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
// queue-a used y=1G, ""=1G
checkUsedResource(rm, "a", 1024, "y");
checkUsedResource(rm, "a", 1024);
// change h2's label to no label, container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
CommonNodeLabelsManager.EMPTY_STRING_SET));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
// check used resource:
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 1024);
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// change h3's label to z, AM container should be killed
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
toSet("z")));
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.KILLED, 10 * 1000));
// check used resource:
// queue-a used x=0G, y=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0);
rm.close();
}
}