From bb6c79f76c20e6ffb3fca32685602747916f452a Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 10 Oct 2014 11:44:21 -0700 Subject: [PATCH] YARN-2494. Added NodeLabels Manager internal API and implementation. Contributed by Wangda Tan. (cherry picked from commit db7f1653198b950e89567c06898d64f6b930a0ee) --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 11 + .../nodelabels/CommonNodeLabelsManager.java | 722 ++++++++++++++++++ .../nodelabels/FileSystemNodeLabelsStore.java | 255 +++++++ .../yarn/nodelabels/NodeLabelsStore.java | 69 ++ .../event/NodeLabelsStoreEvent.java | 28 + .../event/NodeLabelsStoreEventType.java | 25 + .../event/RemoveClusterNodeLabels.java | 34 + .../event/StoreNewClusterNodeLabels.java | 34 + .../UpdateNodeToLabelsMappingsEvent.java | 37 + .../DummyCommonNodeLabelsManager.java | 81 ++ .../yarn/nodelabels/NodeLabelTestBase.java | 76 ++ .../TestCommonNodeLabelsManager.java | 261 +++++++ .../TestFileSystemNodeLabelsStore.java | 252 ++++++ .../nodelabels/RMNodeLabelsManager.java | 447 +++++++++++ .../nodelabels/DummyRMNodeLabelsManager.java | 83 ++ .../nodelabels/TestRMNodeLabelsManager.java | 367 +++++++++ 17 files changed, 2785 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ec31f8fb290..2e3c2860ce3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -93,6 +93,9 @@ Release 2.6.0 - UNRELEASED YARN-2544. Added admin-API objects for using node-labels. (Wangda Tan via vinodkv) + YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda + Tan via vinodkv) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6574f51ae09..b1b0dc02f95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1376,6 +1376,17 @@ public class YarnConfiguration extends Configuration { public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy"; public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY .name(); + + public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels."; + + /** URI for NodeLabelManager */ + public static final String FS_NODE_LABELS_STORE_URI = NODE_LABELS_PREFIX + + "fs-store.uri"; + public static final String DEFAULT_FS_NODE_LABELS_STORE_URI = "file:///tmp/"; + public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = + NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; + public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = + "2000, 500"; public YarnConfiguration() { super(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java new file mode 100644 index 00000000000..89fbf09bca9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -0,0 +1,722 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels; +import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEvent; +import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEventType; +import org.apache.hadoop.yarn.nodelabels.event.RemoveClusterNodeLabels; +import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.ImmutableSet; + +public class CommonNodeLabelsManager extends AbstractService { + protected static final Log LOG = LogFactory.getLog(CommonNodeLabelsManager.class); + private static final int MAX_LABEL_LENGTH = 255; + public static final Set EMPTY_STRING_SET = Collections + .unmodifiableSet(new HashSet(0)); + public static final String ANY = "*"; + public static final Set ACCESS_ANY_LABEL_SET = ImmutableSet.of(ANY); + private static final Pattern LABEL_PATTERN = Pattern + .compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*"); + public static final int WILDCARD_PORT = 0; + + /** + * If a user doesn't specify label of a queue or node, it belongs + * DEFAULT_LABEL + */ + public static final String NO_LABEL = ""; + + protected Dispatcher dispatcher; + + protected ConcurrentMap labelCollections = + new ConcurrentHashMap(); + protected ConcurrentMap nodeCollections = + new ConcurrentHashMap(); + + protected final ReadLock readLock; + protected final WriteLock writeLock; + + protected NodeLabelsStore store; + + protected static class Label { + public Resource resource; + + protected Label() { + this.resource = Resource.newInstance(0, 0); + } + } + + /** + * A Host can have multiple Nodes + */ + protected static class Host { + public Set labels; + public Map nms; + + protected Host() { + labels = + Collections.newSetFromMap(new ConcurrentHashMap()); + nms = new ConcurrentHashMap(); + } + + public Host copy() { + Host c = new Host(); + c.labels = new HashSet(labels); + for (Entry entry : nms.entrySet()) { + c.nms.put(entry.getKey(), entry.getValue().copy()); + } + return c; + } + } + + protected static class Node { + public Set labels; + public Resource resource; + public boolean running; + + protected Node() { + labels = null; + resource = Resource.newInstance(0, 0); + running = false; + } + + public Node copy() { + Node c = new Node(); + if (labels != null) { + c.labels = + Collections.newSetFromMap(new ConcurrentHashMap()); + } else { + c.labels = null; + } + c.resource = Resources.clone(resource); + c.running = running; + return c; + } + } + + private final class ForwardingEventHandler implements + EventHandler { + + @Override + public void handle(NodeLabelsStoreEvent event) { + if (isInState(STATE.STARTED)) { + handleStoreEvent(event); + } + } + } + + // Dispatcher related code + protected void handleStoreEvent(NodeLabelsStoreEvent event) { + try { + switch (event.getType()) { + case ADD_LABELS: + StoreNewClusterNodeLabels storeNewClusterNodeLabelsEvent = + (StoreNewClusterNodeLabels) event; + store.storeNewClusterNodeLabels(storeNewClusterNodeLabelsEvent + .getLabels()); + break; + case REMOVE_LABELS: + RemoveClusterNodeLabels removeClusterNodeLabelsEvent = + (RemoveClusterNodeLabels) event; + store.removeClusterNodeLabels(removeClusterNodeLabelsEvent.getLabels()); + break; + case STORE_NODE_TO_LABELS: + UpdateNodeToLabelsMappingsEvent updateNodeToLabelsMappingsEvent = + (UpdateNodeToLabelsMappingsEvent) event; + store.updateNodeToLabelsMappings(updateNodeToLabelsMappingsEvent + .getNodeToLabels()); + break; + } + } catch (IOException e) { + LOG.error("Failed to store label modification to storage"); + throw new YarnRuntimeException(e); + } + } + + public CommonNodeLabelsManager() { + super(CommonNodeLabelsManager.class.getName()); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + // for UT purpose + protected void initDispatcher(Configuration conf) { + // create async handler + dispatcher = new AsyncDispatcher(); + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.init(conf); + asyncDispatcher.setDrainEventsOnStop(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + initNodeLabelStore(conf); + + labelCollections.put(NO_LABEL, new Label()); + } + + protected void initNodeLabelStore(Configuration conf) throws Exception { + this.store = new FileSystemNodeLabelsStore(this); + this.store.init(conf); + this.store.recover(); + } + + // for UT purpose + protected void startDispatcher() { + // start dispatcher + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.start(); + } + + @Override + protected void serviceStart() throws Exception { + // init dispatcher only when service start, because recover will happen in + // service init, we don't want to trigger any event handling at that time. + initDispatcher(getConfig()); + + dispatcher.register(NodeLabelsStoreEventType.class, + new ForwardingEventHandler()); + + startDispatcher(); + } + + // for UT purpose + protected void stopDispatcher() { + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.stop(); + } + + @Override + protected void serviceStop() throws Exception { + // finalize store + stopDispatcher(); + store.close(); + } + + /** + * Add multiple node labels to repository + * + * @param labels + * new node labels added + */ + @SuppressWarnings("unchecked") + public void addToCluserNodeLabels(Set labels) throws IOException { + if (null == labels || labels.isEmpty()) { + return; + } + + labels = normalizeLabels(labels); + + // do a check before actual adding them, will throw exception if any of them + // doesn't meet label name requirement + for (String label : labels) { + checkAndThrowLabelName(label); + } + + for (String label : labels) { + this.labelCollections.put(label, new Label()); + } + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new StoreNewClusterNodeLabels(labels)); + } + + LOG.info("Add labels: [" + StringUtils.join(labels.iterator(), ",") + "]"); + } + + protected void checkAddLabelsToNode( + Map> addedLabelsToNode) throws IOException { + if (null == addedLabelsToNode || addedLabelsToNode.isEmpty()) { + return; + } + + // check all labels being added existed + Set knownLabels = labelCollections.keySet(); + for (Entry> entry : addedLabelsToNode.entrySet()) { + if (!knownLabels.containsAll(entry.getValue())) { + String msg = + "Not all labels being added contained by known " + + "label collections, please check" + ", added labels=[" + + StringUtils.join(entry.getValue(), ",") + "]"; + LOG.error(msg); + throw new IOException(msg); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalAddLabelsToNode( + Map> addedLabelsToNode) throws IOException { + // do add labels to nodes + Map> newNMToLabels = + new HashMap>(); + for (Entry> entry : addedLabelsToNode.entrySet()) { + NodeId nodeId = entry.getKey(); + Set labels = entry.getValue(); + + createNodeIfNonExisted(entry.getKey()); + + if (nodeId.getPort() == WILDCARD_PORT) { + Host host = nodeCollections.get(nodeId.getHost()); + host.labels.addAll(labels); + newNMToLabels.put(nodeId, host.labels); + } else { + Node nm = getNMInNodeSet(nodeId); + if (nm.labels == null) { + nm.labels = new HashSet(); + } + nm.labels.addAll(labels); + newNMToLabels.put(nodeId, nm.labels); + } + } + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new UpdateNodeToLabelsMappingsEvent(newNMToLabels)); + } + + // shows node->labels we added + LOG.info("addLabelsToNode:"); + for (Entry> entry : newNMToLabels.entrySet()) { + LOG.info(" NM=" + entry.getKey() + ", labels=[" + + StringUtils.join(entry.getValue().iterator(), ",") + "]"); + } + } + + /** + * add more labels to nodes + * + * @param addedLabelsToNode node -> labels map + */ + public void addLabelsToNode(Map> addedLabelsToNode) + throws IOException { + checkAddLabelsToNode(addedLabelsToNode); + internalAddLabelsToNode(addedLabelsToNode); + } + + protected void checkRemoveFromClusterNodeLabels( + Collection labelsToRemove) throws IOException { + if (null == labelsToRemove || labelsToRemove.isEmpty()) { + return; + } + + // Check if label to remove doesn't existed or null/empty, will throw + // exception if any of labels to remove doesn't meet requirement + for (String label : labelsToRemove) { + label = normalizeLabel(label); + if (label == null || label.isEmpty()) { + throw new IOException("Label to be removed is null or empty"); + } + + if (!labelCollections.containsKey(label)) { + throw new IOException("Node label=" + label + + " to be removed doesn't existed in cluster " + + "node labels collection."); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalRemoveFromClusterNodeLabels(Collection labelsToRemove) { + // remove labels from nodes + for (String nodeName : nodeCollections.keySet()) { + Host host = nodeCollections.get(nodeName); + if (null != host) { + host.labels.removeAll(labelsToRemove); + for (Node nm : host.nms.values()) { + if (nm.labels != null) { + nm.labels.removeAll(labelsToRemove); + } + } + } + } + + // remove labels from node labels collection + for (String label : labelsToRemove) { + labelCollections.remove(label); + } + + // create event to remove labels + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new RemoveClusterNodeLabels(labelsToRemove)); + } + + LOG.info("Remove labels: [" + + StringUtils.join(labelsToRemove.iterator(), ",") + "]"); + } + + /** + * Remove multiple node labels from repository + * + * @param labelsToRemove + * node labels to remove + * @throws IOException + */ + public void removeFromClusterNodeLabels(Collection labelsToRemove) + throws IOException { + checkRemoveFromClusterNodeLabels(labelsToRemove); + + internalRemoveFromClusterNodeLabels(labelsToRemove); + } + + protected void checkRemoveLabelsFromNode( + Map> removeLabelsFromNode) throws IOException { + // check all labels being added existed + Set knownLabels = labelCollections.keySet(); + for (Entry> entry : removeLabelsFromNode.entrySet()) { + NodeId nodeId = entry.getKey(); + Set labels = entry.getValue(); + + if (!knownLabels.containsAll(labels)) { + String msg = + "Not all labels being removed contained by known " + + "label collections, please check" + ", removed labels=[" + + StringUtils.join(labels, ",") + "]"; + LOG.error(msg); + throw new IOException(msg); + } + + Set originalLabels = null; + + boolean nodeExisted = false; + if (WILDCARD_PORT != nodeId.getPort()) { + Node nm = getNMInNodeSet(nodeId); + if (nm != null) { + originalLabels = nm.labels; + nodeExisted = true; + } + } else { + Host host = nodeCollections.get(nodeId.getHost()); + if (null != host) { + originalLabels = host.labels; + nodeExisted = true; + } + } + + if (!nodeExisted) { + String msg = + "Try to remove labels from NM=" + nodeId + + ", but the NM doesn't existed"; + LOG.error(msg); + throw new IOException(msg); + } + + if (labels == null || labels.isEmpty()) { + continue; + } + + if (!originalLabels.containsAll(labels)) { + String msg = + "Try to remove labels = [" + StringUtils.join(labels, ",") + + "], but not all labels contained by NM=" + nodeId; + LOG.error(msg); + throw new IOException(msg); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalRemoveLabelsFromNode( + Map> removeLabelsFromNode) { + // do remove labels from nodes + Map> newNMToLabels = + new HashMap>(); + for (Entry> entry : removeLabelsFromNode.entrySet()) { + NodeId nodeId = entry.getKey(); + Set labels = entry.getValue(); + + if (nodeId.getPort() == WILDCARD_PORT) { + Host host = nodeCollections.get(nodeId.getHost()); + host.labels.removeAll(labels); + newNMToLabels.put(nodeId, host.labels); + } else { + Node nm = getNMInNodeSet(nodeId); + if (nm.labels != null) { + nm.labels.removeAll(labels); + newNMToLabels.put(nodeId, nm.labels); + } + } + } + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new UpdateNodeToLabelsMappingsEvent(newNMToLabels)); + } + + // shows node->labels we added + LOG.info("removeLabelsFromNode:"); + for (Entry> entry : newNMToLabels.entrySet()) { + LOG.info(" NM=" + entry.getKey() + ", labels=[" + + StringUtils.join(entry.getValue().iterator(), ",") + "]"); + } + } + + /** + * remove labels from nodes, labels being removed most be contained by these + * nodes + * + * @param removeLabelsFromNode node -> labels map + */ + public void + removeLabelsFromNode(Map> removeLabelsFromNode) + throws IOException { + checkRemoveLabelsFromNode(removeLabelsFromNode); + + internalRemoveLabelsFromNode(removeLabelsFromNode); + } + + protected void checkReplaceLabelsOnNode( + Map> replaceLabelsToNode) throws IOException { + if (null == replaceLabelsToNode || replaceLabelsToNode.isEmpty()) { + return; + } + + // check all labels being added existed + Set knownLabels = labelCollections.keySet(); + for (Entry> entry : replaceLabelsToNode.entrySet()) { + if (!knownLabels.containsAll(entry.getValue())) { + String msg = + "Not all labels being replaced contained by known " + + "label collections, please check" + ", new labels=[" + + StringUtils.join(entry.getValue(), ",") + "]"; + LOG.error(msg); + throw new IOException(msg); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalReplaceLabelsOnNode( + Map> replaceLabelsToNode) { + // do replace labels to nodes + Map> newNMToLabels = new HashMap>(); + for (Entry> entry : replaceLabelsToNode.entrySet()) { + NodeId nodeId = entry.getKey(); + Set labels = entry.getValue(); + + // update nodeCollections + createNodeIfNonExisted(entry.getKey()); + if (nodeId.getPort() == WILDCARD_PORT) { + Host host = nodeCollections.get(nodeId.getHost()); + host.labels.clear(); + host.labels.addAll(labels); + newNMToLabels.put(nodeId, host.labels); + } else { + Node nm = getNMInNodeSet(nodeId); + if (nm.labels == null) { + nm.labels = new HashSet(); + } + nm.labels.clear(); + nm.labels.addAll(labels); + newNMToLabels.put(nodeId, nm.labels); + } + } + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new UpdateNodeToLabelsMappingsEvent(newNMToLabels)); + } + + // shows node->labels we added + LOG.info("setLabelsToNode:"); + for (Entry> entry : newNMToLabels.entrySet()) { + LOG.info(" NM=" + entry.getKey() + ", labels=[" + + StringUtils.join(entry.getValue().iterator(), ",") + "]"); + } + } + + /** + * replace labels to nodes + * + * @param replaceLabelsToNode node -> labels map + */ + public void replaceLabelsOnNode(Map> replaceLabelsToNode) + throws IOException { + checkReplaceLabelsOnNode(replaceLabelsToNode); + + internalReplaceLabelsOnNode(replaceLabelsToNode); + } + + /** + * Get mapping of nodes to labels + * + * @return nodes to labels map + */ + public Map> getNodeLabels() { + try { + readLock.lock(); + Map> nodeToLabels = + new HashMap>(); + for (Entry entry : nodeCollections.entrySet()) { + String hostName = entry.getKey(); + Host host = entry.getValue(); + for (NodeId nodeId : host.nms.keySet()) { + Set nodeLabels = getLabelsByNode(nodeId); + if (nodeLabels == null || nodeLabels.isEmpty()) { + continue; + } + nodeToLabels.put(nodeId, nodeLabels); + } + if (!host.labels.isEmpty()) { + nodeToLabels + .put(NodeId.newInstance(hostName, WILDCARD_PORT), host.labels); + } + } + return Collections.unmodifiableMap(nodeToLabels); + } finally { + readLock.unlock(); + } + } + + /** + * Get existing valid labels in repository + * + * @return existing valid labels in repository + */ + public Set getClusterNodeLabels() { + try { + readLock.lock(); + Set labels = new HashSet(labelCollections.keySet()); + labels.remove(NO_LABEL); + return Collections.unmodifiableSet(labels); + } finally { + readLock.unlock(); + } + } + + private void checkAndThrowLabelName(String label) throws IOException { + if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { + throw new IOException("label added is empty or exceeds " + + MAX_LABEL_LENGTH + " character(s)"); + } + label = label.trim(); + + boolean match = LABEL_PATTERN.matcher(label).matches(); + + if (!match) { + throw new IOException("label name should only contains " + + "{0-9, a-z, A-Z, -, _} and should not started with {-,_}" + + ", now it is=" + label); + } + } + + protected String normalizeLabel(String label) { + if (label != null) { + return label.trim(); + } + return NO_LABEL; + } + + private Set normalizeLabels(Set labels) { + Set newLabels = new HashSet(); + for (String label : labels) { + newLabels.add(normalizeLabel(label)); + } + return newLabels; + } + + protected Node getNMInNodeSet(NodeId nodeId) { + return getNMInNodeSet(nodeId, nodeCollections); + } + + protected Node getNMInNodeSet(NodeId nodeId, Map map) { + return getNMInNodeSet(nodeId, map, false); + } + + protected Node getNMInNodeSet(NodeId nodeId, Map map, + boolean checkRunning) { + if (WILDCARD_PORT == nodeId.getPort()) { + return null; + } + + Host host = map.get(nodeId.getHost()); + if (null == host) { + return null; + } + Node nm = host.nms.get(nodeId); + if (null == nm) { + return null; + } + if (checkRunning) { + return nm.running ? nm : null; + } + return nm; + } + + protected Set getLabelsByNode(NodeId nodeId) { + return getLabelsByNode(nodeId, nodeCollections); + } + + protected Set getLabelsByNode(NodeId nodeId, Map map) { + Host host = map.get(nodeId.getHost()); + if (null == host) { + return EMPTY_STRING_SET; + } + Node nm = host.nms.get(nodeId); + if (null != nm && null != nm.labels) { + return nm.labels; + } else { + return host.labels; + } + } + + protected void createNodeIfNonExisted(NodeId nodeId) { + Host host = nodeCollections.get(nodeId.getHost()); + if (null == host) { + host = new Host(); + nodeCollections.put(nodeId.getHost(), host); + } + if (nodeId.getPort() != WILDCARD_PORT) { + Node nm = host.nms.get(nodeId); + if (null == nm) { + host.nms.put(nodeId, new Node()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java new file mode 100644 index 00000000000..2778c742a2d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; + +import com.google.common.collect.Sets; + +public class FileSystemNodeLabelsStore extends NodeLabelsStore { + + public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) { + super(mgr); + } + + protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class); + + protected static final String ROOT_DIR_NAME = "FSNodeLabelManagerRoot"; + protected static final String MIRROR_FILENAME = "nodelabel.mirror"; + protected static final String EDITLOG_FILENAME = "nodelabel.editlog"; + + protected enum SerializedLogType { + ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS + } + + Path fsWorkingPath; + Path rootDirPath; + FileSystem fs; + FSDataOutputStream editlogOs; + Path editLogPath; + + @Override + public void init(Configuration conf) throws Exception { + fsWorkingPath = + new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_URI, + YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_URI)); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + + setFileSystem(conf); + + // mkdir of root dir path + fs.mkdirs(rootDirPath); + } + + @Override + public void close() throws IOException { + try { + fs.close(); + editlogOs.close(); + } catch (IOException e) { + LOG.warn("Exception happened whiling shutting down,", e); + } + } + + private void setFileSystem(Configuration conf) throws IOException { + Configuration confCopy = new Configuration(conf); + confCopy.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = + confCopy.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC); + confCopy.set("dfs.client.retry.policy.spec", retryPolicy); + fs = fsWorkingPath.getFileSystem(confCopy); + + // if it's local file system, use RawLocalFileSystem instead of + // LocalFileSystem, the latter one doesn't support append. + if (fs.getScheme().equals("file")) { + fs = ((LocalFileSystem)fs).getRaw(); + } + } + + private void ensureAppendEditlogFile() throws IOException { + editlogOs = fs.append(editLogPath); + } + + private void ensureCloseEditlogFile() throws IOException { + editlogOs.close(); + } + + @Override + public void updateNodeToLabelsMappings( + Map> nodeToLabels) throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal()); + ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest + .newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void storeNewClusterNodeLabels(Set labels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal()); + ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest.newInstance(labels)).getProto() + .writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void removeClusterNodeLabels(Collection labels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal()); + ((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets + .newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void recover() throws IOException { + /* + * Steps of recover + * 1) Read from last mirror (from mirror or mirror.old) + * 2) Read from last edit log, and apply such edit log + * 3) Write new mirror to mirror.writing + * 4) Rename mirror to mirror.old + * 5) Move mirror.writing to mirror + * 6) Remove mirror.old + * 7) Remove edit log and create a new empty edit log + */ + + // Open mirror from serialized file + Path mirrorPath = new Path(rootDirPath, MIRROR_FILENAME); + Path oldMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".old"); + + FSDataInputStream is = null; + if (fs.exists(mirrorPath)) { + is = fs.open(mirrorPath); + } else if (fs.exists(oldMirrorPath)) { + is = fs.open(oldMirrorPath); + } + + if (null != is) { + Set labels = + new AddToClusterNodeLabelsRequestPBImpl( + AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels(); + Map> nodeToLabels = + new ReplaceLabelsOnNodeRequestPBImpl( + ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) + .getNodeToLabels(); + mgr.addToCluserNodeLabels(labels); + mgr.replaceLabelsOnNode(nodeToLabels); + is.close(); + } + + // Open and process editlog + editLogPath = new Path(rootDirPath, EDITLOG_FILENAME); + if (fs.exists(editLogPath)) { + is = fs.open(editLogPath); + + while (true) { + try { + // read edit log one by one + SerializedLogType type = SerializedLogType.values()[is.readInt()]; + + switch (type) { + case ADD_LABELS: { + Collection labels = + AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is) + .getNodeLabelsList(); + mgr.addToCluserNodeLabels(Sets.newHashSet(labels.iterator())); + break; + } + case REMOVE_LABELS: { + Collection labels = + RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is) + .getNodeLabelsList(); + mgr.removeFromClusterNodeLabels(labels); + break; + } + case NODE_TO_LABELS: { + Map> map = + new ReplaceLabelsOnNodeRequestPBImpl( + ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) + .getNodeToLabels(); + mgr.replaceLabelsOnNode(map); + break; + } + } + } catch (EOFException e) { + // EOF hit, break + break; + } + } + } + + // Serialize current mirror to mirror.writing + Path writingMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".writing"); + FSDataOutputStream os = fs.create(writingMirrorPath, true); + ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl + .newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os); + ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest + .newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os); + os.close(); + + // Move mirror to mirror.old + if (fs.exists(mirrorPath)) { + fs.delete(oldMirrorPath, false); + fs.rename(mirrorPath, oldMirrorPath); + } + + // move mirror.writing to mirror + fs.rename(writingMirrorPath, mirrorPath); + fs.delete(writingMirrorPath, false); + + // remove mirror.old + fs.delete(oldMirrorPath, false); + + // create a new editlog file + editlogOs = fs.create(editLogPath, true); + editlogOs.close(); + + LOG.info("Finished write mirror at:" + mirrorPath.toString()); + LOG.info("Finished create editlog file at:" + editLogPath.toString()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java new file mode 100644 index 00000000000..033c0343ce0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; + +public abstract class NodeLabelsStore implements Closeable { + protected final CommonNodeLabelsManager mgr; + protected Configuration conf; + + public NodeLabelsStore(CommonNodeLabelsManager mgr) { + this.mgr = mgr; + } + + /** + * Store node -> label + */ + public abstract void updateNodeToLabelsMappings( + Map> nodeToLabels) throws IOException; + + /** + * Store new labels + */ + public abstract void storeNewClusterNodeLabels(Set label) + throws IOException; + + /** + * Remove labels + */ + public abstract void removeClusterNodeLabels(Collection labels) + throws IOException; + + /** + * Recover labels and node to labels mappings from store + * @param conf + */ + public abstract void recover() throws IOException; + + public void init(Configuration conf) throws Exception { + this.conf = conf; + } + + public CommonNodeLabelsManager getNodeLabelsManager() { + return mgr; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java new file mode 100644 index 00000000000..b1b7f1140e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class NodeLabelsStoreEvent extends + AbstractEvent { + public NodeLabelsStoreEvent(NodeLabelsStoreEventType type) { + super(type); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java new file mode 100644 index 00000000000..efa2dbebac8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels.event; + +public enum NodeLabelsStoreEventType { + REMOVE_LABELS, + ADD_LABELS, + STORE_NODE_TO_LABELS +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java new file mode 100644 index 00000000000..ae78394d01c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels.event; + +import java.util.Collection; + +public class RemoveClusterNodeLabels extends NodeLabelsStoreEvent { + private Collection labels; + + public RemoveClusterNodeLabels(Collection labels) { + super(NodeLabelsStoreEventType.REMOVE_LABELS); + this.labels = labels; + } + + public Collection getLabels() { + return labels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java new file mode 100644 index 00000000000..b478c6bfb11 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels.event; + +import java.util.Set; + +public class StoreNewClusterNodeLabels extends NodeLabelsStoreEvent { + private Set labels; + + public StoreNewClusterNodeLabels(Set labels) { + super(NodeLabelsStoreEventType.ADD_LABELS); + this.labels = labels; + } + + public Set getLabels() { + return labels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java new file mode 100644 index 00000000000..27eeb81ab33 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels.event; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeId; + +public class UpdateNodeToLabelsMappingsEvent extends NodeLabelsStoreEvent { + private Map> nodeToLabels; + + public UpdateNodeToLabelsMappingsEvent(Map> nodeToLabels) { + super(NodeLabelsStoreEventType.STORE_NODE_TO_LABELS); + this.nodeToLabels = nodeToLabels; + } + + public Map> getNodeToLabels() { + return nodeToLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java new file mode 100644 index 00000000000..fcdf96946c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.InlineDispatcher; + +public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager { + Map> lastNodeToLabels = null; + Collection lastAddedlabels = null; + Collection lastRemovedlabels = null; + + @Override + public void initNodeLabelStore(Configuration conf) { + this.store = new NodeLabelsStore(this) { + + @Override + public void recover() throws IOException { + } + + @Override + public void removeClusterNodeLabels(Collection labels) + throws IOException { + lastRemovedlabels = labels; + } + + @Override + public void updateNodeToLabelsMappings( + Map> nodeToLabels) throws IOException { + lastNodeToLabels = nodeToLabels; + } + + @Override + public void storeNewClusterNodeLabels(Set label) throws IOException { + lastAddedlabels = label; + } + + @Override + public void close() throws IOException { + // do nothing + } + }; + } + + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java new file mode 100644 index 00000000000..9749299c375 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.junit.Assert; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class NodeLabelTestBase { + public static void assertMapEquals(Map> m1, + ImmutableMap> m2) { + Assert.assertEquals(m1.size(), m2.size()); + for (NodeId k : m1.keySet()) { + Assert.assertTrue(m2.containsKey(k)); + assertCollectionEquals(m1.get(k), m2.get(k)); + } + } + + public static void assertMapContains(Map> m1, + ImmutableMap> m2) { + for (NodeId k : m2.keySet()) { + Assert.assertTrue(m1.containsKey(k)); + assertCollectionEquals(m1.get(k), m2.get(k)); + } + } + + public static void assertCollectionEquals(Collection c1, + Collection c2) { + Assert.assertEquals(c1.size(), c2.size()); + Iterator i1 = c1.iterator(); + Iterator i2 = c2.iterator(); + while (i1.hasNext()) { + Assert.assertEquals(i1.next(), i2.next()); + } + } + + public static Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + public NodeId toNodeId(String str) { + if (str.contains(":")) { + int idx = str.indexOf(':'); + NodeId id = + NodeId.newInstance(str.substring(0, idx), + Integer.valueOf(str.substring(idx + 1))); + return id; + } else { + return NodeId.newInstance(str, CommonNodeLabelsManager.WILDCARD_PORT); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java new file mode 100644 index 00000000000..ea29f3acc58 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestCommonNodeLabelsManager extends NodeLabelTestBase { + DummyCommonNodeLabelsManager mgr = null; + + @Before + public void before() { + mgr = new DummyCommonNodeLabelsManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test(timeout = 5000) + public void testAddRemovelabel() throws Exception { + // Add some label + mgr.addToCluserNodeLabels(ImmutableSet.of("hello")); + assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("hello")); + + mgr.addToCluserNodeLabels(ImmutableSet.of("world")); + mgr.addToCluserNodeLabels(toSet("hello1", "world1")); + assertCollectionEquals(mgr.lastAddedlabels, + Sets.newHashSet("hello1", "world1")); + + Assert.assertTrue(mgr.getClusterNodeLabels().containsAll( + Sets.newHashSet("hello", "world", "hello1", "world1"))); + + // try to remove null, empty and non-existed label, should fail + for (String p : Arrays.asList(null, CommonNodeLabelsManager.NO_LABEL, "xx")) { + boolean caught = false; + try { + mgr.removeFromClusterNodeLabels(Arrays.asList(p)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("remove label should fail " + + "when label is null/empty/non-existed", caught); + } + + // Remove some label + mgr.removeFromClusterNodeLabels(Arrays.asList("hello")); + assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("hello")); + Assert.assertTrue(mgr.getClusterNodeLabels().containsAll( + Arrays.asList("world", "hello1", "world1"))); + + mgr.removeFromClusterNodeLabels(Arrays + .asList("hello1", "world1", "world")); + Assert.assertTrue(mgr.lastRemovedlabels.containsAll(Sets.newHashSet( + "hello1", "world1", "world"))); + Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty()); + } + + @Test(timeout = 5000) + public void testAddlabelWithCase() throws Exception { + // Add some label, case will not ignore here + mgr.addToCluserNodeLabels(ImmutableSet.of("HeLlO")); + assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("HeLlO")); + Assert.assertFalse(mgr.getClusterNodeLabels().containsAll(Arrays.asList("hello"))); + } + + @Test(timeout = 5000) + public void testAddInvalidlabel() throws IOException { + boolean caught = false; + try { + Set set = new HashSet(); + set.add(null); + mgr.addToCluserNodeLabels(set); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("null label should not add to repo", caught); + + caught = false; + try { + mgr.addToCluserNodeLabels(ImmutableSet.of(CommonNodeLabelsManager.NO_LABEL)); + } catch (IOException e) { + caught = true; + } + + Assert.assertTrue("empty label should not add to repo", caught); + + caught = false; + try { + mgr.addToCluserNodeLabels(ImmutableSet.of("-?")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("invalid label charactor should not add to repo", caught); + + caught = false; + try { + mgr.addToCluserNodeLabels(ImmutableSet.of(StringUtils.repeat("c", 257))); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("too long label should not add to repo", caught); + + caught = false; + try { + mgr.addToCluserNodeLabels(ImmutableSet.of("-aaabbb")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("label cannot start with \"-\"", caught); + + caught = false; + try { + mgr.addToCluserNodeLabels(ImmutableSet.of("_aaabbb")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("label cannot start with \"_\"", caught); + + caught = false; + try { + mgr.addToCluserNodeLabels(ImmutableSet.of("a^aabbb")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("label cannot contains other chars like ^[] ...", caught); + + caught = false; + try { + mgr.addToCluserNodeLabels(ImmutableSet.of("aa[a]bbb")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("label cannot contains other chars like ^[] ...", caught); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 5000) + public void testAddReplaceRemoveLabelsOnNodes() throws Exception { + // set a label on a node, but label doesn't exist + boolean caught = false; + try { + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("node"), toSet("label"))); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("trying to set a label to a node but " + + "label doesn't exist in repository should fail", caught); + + // set a label on a node, but node is null or empty + try { + mgr.replaceLabelsOnNode(ImmutableMap.of( + toNodeId(CommonNodeLabelsManager.NO_LABEL), toSet("label"))); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("trying to add a empty node but succeeded", caught); + + // set node->label one by one + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p3"))); + assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"), + toSet("p2"), toNodeId("n2"), toSet("p3"))); + assertMapEquals(mgr.lastNodeToLabels, + ImmutableMap.of(toNodeId("n2"), toSet("p3"))); + + // set bunch of node->label + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"), + toNodeId("n1"), toSet("p1"))); + assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"), + toSet("p1"), toNodeId("n2"), toSet("p3"), toNodeId("n3"), toSet("p3"))); + assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n3"), + toSet("p3"), toNodeId("n1"), toSet("p1"))); + + /* + * n1: p1 + * n2: p3 + * n3: p3 + */ + + // remove label on node + mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"))); + assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p3"), toNodeId("n3"), toSet("p3"))); + assertMapEquals(mgr.lastNodeToLabels, + ImmutableMap.of(toNodeId("n1"), CommonNodeLabelsManager.EMPTY_STRING_SET)); + + // add label on node + mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"))); + assertMapEquals( + mgr.getNodeLabels(), + ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"), + toSet("p2", "p3"), toNodeId("n3"), toSet("p3"))); + assertMapEquals(mgr.lastNodeToLabels, + ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"), + toSet("p2", "p3"))); + + // remove labels on node + mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2", "p3"), toNodeId("n3"), toSet("p3"))); + Assert.assertEquals(0, mgr.getNodeLabels().size()); + assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n1"), + CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n2"), + CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n3"), + CommonNodeLabelsManager.EMPTY_STRING_SET)); + } + + @Test(timeout = 5000) + public void testRemovelabelWithNodes() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p2"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n3"), toSet("p3"))); + + mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1")); + assertMapEquals(mgr.getNodeLabels(), + ImmutableMap.of(toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); + assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p1")); + + mgr.removeFromClusterNodeLabels(ImmutableSet.of("p2", "p3")); + Assert.assertTrue(mgr.getNodeLabels().isEmpty()); + Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty()); + assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p2", "p3")); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java new file mode 100644 index 00000000000..a7546cb7040 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase { + MockNodeLabelManager mgr = null; + Configuration conf = null; + + private static class MockNodeLabelManager extends + CommonNodeLabelsManager { + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } + } + + private FileSystemNodeLabelsStore getStore() { + return (FileSystemNodeLabelsStore) mgr.store; + } + + @Before + public void before() throws IOException { + mgr = new MockNodeLabelManager(); + conf = new Configuration(); + File tempDir = File.createTempFile("nlb", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_URI, + tempDir.getAbsolutePath()); + mgr.init(conf); + mgr.start(); + } + + @After + public void after() throws IOException { + getStore().fs.delete(getStore().rootDirPath, true); + mgr.stop(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 10000) + public void testRecoverWithMirror() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.addToCluserNodeLabels(toSet("p4")); + mgr.addToCluserNodeLabels(toSet("p5", "p6")); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"))); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"), + toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"), + toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6"))); + + /* + * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7 + */ + + mgr.removeFromClusterNodeLabels(toSet("p1")); + mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5")); + + /* + * After removed p2: n2 p4: n4 p6: n6, n7 + */ + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockNodeLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabels().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + + // stutdown mgr and start a new mgr + mgr.stop(); + mgr = new MockNodeLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabels().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + mgr.stop(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 10000) + public void testEditlogRecover() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.addToCluserNodeLabels(toSet("p4")); + mgr.addToCluserNodeLabels(toSet("p5", "p6")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"))); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"), + toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"), + toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6"))); + + /* + * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7 + */ + + mgr.removeFromClusterNodeLabels(toSet("p1")); + mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5")); + + /* + * After removed p2: n2 p4: n4 p6: n6, n7 + */ + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockNodeLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabels().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + mgr.stop(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test//(timeout = 10000) + public void testSerilizationAfterRecovery() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.addToCluserNodeLabels(toSet("p4")); + mgr.addToCluserNodeLabels(toSet("p5", "p6")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"))); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"), + toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"), + toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6"))); + + /* + * node -> labels + * p1: n1 + * p2: n2 + * p3: n3 + * p4: n4 + * p5: n5 + * p6: n6, n7 + */ + + mgr.removeFromClusterNodeLabels(toSet("p1")); + mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5")); + + /* + * After removed + * p2: n2 + * p4: n4 + * p6: n6, n7 + */ + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabels().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + + /* + * Add label p7,p8 then shutdown + */ + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + mgr.addToCluserNodeLabels(toSet("p7", "p8")); + mgr.stop(); + + /* + * Restart, add label p9 and shutdown + */ + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + mgr.addToCluserNodeLabels(toSet("p9")); + mgr.stop(); + + /* + * Recovery, and see if p9 added + */ + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(6, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabels().containsAll( + Arrays.asList("p2", "p4", "p6", "p7", "p8", "p9"))); + mgr.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java new file mode 100644 index 00000000000..1d7f6f15995 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.ImmutableSet; + +public class RMNodeLabelsManager extends CommonNodeLabelsManager { + + protected static class Queue { + protected Set acccessibleNodeLabels; + protected Resource resource; + + protected Queue() { + acccessibleNodeLabels = + Collections.newSetFromMap(new ConcurrentHashMap()); + resource = Resource.newInstance(0, 0); + } + } + + ConcurrentMap queueCollections = + new ConcurrentHashMap(); + protected AccessControlList adminAcl; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + adminAcl = + new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + } + + @Override + public void addLabelsToNode(Map> addedLabelsToNode) + throws IOException { + try { + writeLock.lock(); + + // get nodesCollection before edition + Map before = cloneNodeMap(addedLabelsToNode.keySet()); + + super.addLabelsToNode(addedLabelsToNode); + + // get nodesCollection after edition + Map after = cloneNodeMap(addedLabelsToNode.keySet()); + + // update running nodes resources + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + protected void checkRemoveFromClusterNodeLabelsOfQueue( + Collection labelsToRemove) throws IOException { + // Check if label to remove doesn't existed or null/empty, will throw + // exception if any of labels to remove doesn't meet requirement + for (String label : labelsToRemove) { + label = normalizeLabel(label); + + // check if any queue contains this label + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + Set queueLabels = entry.getValue().acccessibleNodeLabels; + if (queueLabels.contains(label)) { + throw new IOException("Cannot remove label=" + label + + ", because queue=" + queueName + " is using this label. " + + "Please remove label on queue before remove the label"); + } + } + } + } + + @Override + public void removeFromClusterNodeLabels(Collection labelsToRemove) + throws IOException { + try { + writeLock.lock(); + + checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove); + + // copy before NMs + Map before = cloneNodeMap(); + + super.removeFromClusterNodeLabels(labelsToRemove); + + updateResourceMappings(before, nodeCollections); + } finally { + writeLock.unlock(); + } + } + + @Override + public void + removeLabelsFromNode(Map> removeLabelsFromNode) + throws IOException { + try { + writeLock.lock(); + + // get nodesCollection before edition + Map before = + cloneNodeMap(removeLabelsFromNode.keySet()); + + super.removeLabelsFromNode(removeLabelsFromNode); + + // get nodesCollection before edition + Map after = cloneNodeMap(removeLabelsFromNode.keySet()); + + // update running nodes resources + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + @Override + public void replaceLabelsOnNode(Map> replaceLabelsToNode) + throws IOException { + try { + writeLock.lock(); + + // get nodesCollection before edition + Map before = cloneNodeMap(replaceLabelsToNode.keySet()); + + super.replaceLabelsOnNode(replaceLabelsToNode); + + // get nodesCollection after edition + Map after = cloneNodeMap(replaceLabelsToNode.keySet()); + + // update running nodes resources + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + + /* + * Following methods are used for setting if a node is up and running, and it + * will update running nodes resource + */ + public void activateNode(NodeId nodeId, Resource resource) { + try { + writeLock.lock(); + + // save if we have a node before + Map before = cloneNodeMap(ImmutableSet.of(nodeId)); + + createNodeIfNonExisted(nodeId); + Node nm = getNMInNodeSet(nodeId); + nm.resource = resource; + nm.running = true; + + // get the node after edition + Map after = cloneNodeMap(ImmutableSet.of(nodeId)); + + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + /* + * Following methods are used for setting if a node unregistered to RM + */ + public void deactivateNode(NodeId nodeId) { + try { + writeLock.lock(); + + // save if we have a node before + Map before = cloneNodeMap(ImmutableSet.of(nodeId)); + Node nm = getNMInNodeSet(nodeId); + if (null != nm) { + // set nm is not running, and its resource = 0 + nm.running = false; + nm.resource = Resource.newInstance(0, 0); + } + + // get the node after edition + Map after = cloneNodeMap(ImmutableSet.of(nodeId)); + + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + public void updateNodeResource(NodeId node, Resource newResource) { + deactivateNode(node); + activateNode(node, newResource); + } + + public void reinitializeQueueLabels(Map> queueToLabels) { + try { + writeLock.lock(); + // clear before set + this.queueCollections.clear(); + + for (Entry> entry : queueToLabels.entrySet()) { + String queue = entry.getKey(); + Queue q = new Queue(); + this.queueCollections.put(queue, q); + + Set labels = entry.getValue(); + if (labels.contains(ANY)) { + continue; + } + + q.acccessibleNodeLabels.addAll(labels); + for (Host host : nodeCollections.values()) { + for (Entry nentry : host.nms.entrySet()) { + NodeId nodeId = nentry.getKey(); + Node nm = nentry.getValue(); + if (nm.running && isNodeUsableByQueue(getLabelsByNode(nodeId), q)) { + Resources.addTo(q.resource, nm.resource); + } + } + } + } + } finally { + writeLock.unlock(); + } + } + + public Resource getQueueResource(String queueName, Set queueLabels, + Resource clusterResource) { + try { + readLock.lock(); + if (queueLabels.contains(ANY)) { + return clusterResource; + } + Queue q = queueCollections.get(queueName); + if (null == q) { + return Resources.none(); + } + return q.resource; + } finally { + readLock.unlock(); + } + } + + public Set getLabelsOnNode(NodeId nodeId) { + try { + readLock.lock(); + Set nodeLabels = getLabelsByNode(nodeId); + return Collections.unmodifiableSet(nodeLabels); + } finally { + readLock.unlock(); + } + } + + public boolean containsNodeLabel(String label) { + try { + readLock.lock(); + return label != null + && (label.isEmpty() || labelCollections.containsKey(label)); + } finally { + readLock.unlock(); + } + } + + private Map cloneNodeMap(Set nodesToCopy) { + Map map = new HashMap(); + for (NodeId nodeId : nodesToCopy) { + if (!map.containsKey(nodeId.getHost())) { + Host originalN = nodeCollections.get(nodeId.getHost()); + if (null == originalN) { + continue; + } + Host n = originalN.copy(); + n.nms.clear(); + map.put(nodeId.getHost(), n); + } + + Host n = map.get(nodeId.getHost()); + if (WILDCARD_PORT == nodeId.getPort()) { + for (Entry entry : nodeCollections + .get(nodeId.getHost()).nms.entrySet()) { + n.nms.put(entry.getKey(), entry.getValue().copy()); + } + } else { + Node nm = getNMInNodeSet(nodeId); + if (null != nm) { + n.nms.put(nodeId, nm.copy()); + } + } + } + return map; + } + + private void updateResourceMappings(Map before, + Map after) { + // Get NMs in before only + Set allNMs = new HashSet(); + for (Entry entry : before.entrySet()) { + allNMs.addAll(entry.getValue().nms.keySet()); + } + for (Entry entry : after.entrySet()) { + allNMs.addAll(entry.getValue().nms.keySet()); + } + + // traverse all nms + for (NodeId nodeId : allNMs) { + Node oldNM; + if ((oldNM = getNMInNodeSet(nodeId, before, true)) != null) { + Set oldLabels = getLabelsByNode(nodeId, before); + // no label in the past + if (oldLabels.isEmpty()) { + // update labels + Label label = labelCollections.get(NO_LABEL); + Resources.subtractFrom(label.resource, oldNM.resource); + + // update queues, all queue can access this node + for (Queue q : queueCollections.values()) { + Resources.subtractFrom(q.resource, oldNM.resource); + } + } else { + // update labels + for (String labelName : oldLabels) { + Label label = labelCollections.get(labelName); + if (null == label) { + continue; + } + Resources.subtractFrom(label.resource, oldNM.resource); + } + + // update queues, only queue can access this node will be subtract + for (Queue q : queueCollections.values()) { + if (isNodeUsableByQueue(oldLabels, q)) { + Resources.subtractFrom(q.resource, oldNM.resource); + } + } + } + } + + Node newNM; + if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) { + Set newLabels = getLabelsByNode(nodeId, after); + // no label in the past + if (newLabels.isEmpty()) { + // update labels + Label label = labelCollections.get(NO_LABEL); + Resources.addTo(label.resource, newNM.resource); + + // update queues, all queue can access this node + for (Queue q : queueCollections.values()) { + Resources.addTo(q.resource, newNM.resource); + } + } else { + // update labels + for (String labelName : newLabels) { + Label label = labelCollections.get(labelName); + Resources.addTo(label.resource, newNM.resource); + } + + // update queues, only queue can access this node will be subtract + for (Queue q : queueCollections.values()) { + if (isNodeUsableByQueue(newLabels, q)) { + Resources.addTo(q.resource, newNM.resource); + } + } + } + } + } + } + + public Resource getResourceByLabel(String label, Resource clusterResource) { + label = normalizeLabel(label); + try { + readLock.lock(); + if (null == labelCollections.get(label)) { + return Resources.none(); + } + return labelCollections.get(label).resource; + } finally { + readLock.unlock(); + } + } + + private boolean isNodeUsableByQueue(Set nodeLabels, Queue q) { + // node without any labels can be accessed by any queue + if (nodeLabels == null || nodeLabels.isEmpty() + || (nodeLabels.size() == 1 && nodeLabels.contains(NO_LABEL))) { + return true; + } + + for (String label : nodeLabels) { + if (q.acccessibleNodeLabels.contains(label)) { + return true; + } + } + + return false; + } + + private Map cloneNodeMap() { + Set nodesToCopy = new HashSet(); + for (String nodeName : nodeCollections.keySet()) { + nodesToCopy.add(NodeId.newInstance(nodeName, WILDCARD_PORT)); + } + return cloneNodeMap(nodesToCopy); + } + + public boolean checkAccess(UserGroupInformation user) { + // make sure only admin can invoke + // this method + if (adminAcl.isUserAllowed(user)) { + return true; + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java new file mode 100644 index 00000000000..14bd99984c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore; + +public class DummyRMNodeLabelsManager extends RMNodeLabelsManager { + Map> lastNodeToLabels = null; + Collection lastAddedlabels = null; + Collection lastRemovedlabels = null; + + @Override + public void initNodeLabelStore(Configuration conf) { + this.store = new NodeLabelsStore(this) { + + @Override + public void recover() throws IOException { + // do nothing + } + + @Override + public void removeClusterNodeLabels(Collection labels) + throws IOException { + // do nothing + } + + @Override + public void updateNodeToLabelsMappings( + Map> nodeToLabels) throws IOException { + // do nothing + } + + @Override + public void storeNewClusterNodeLabels(Set label) throws IOException { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } + }; + } + + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java new file mode 100644 index 00000000000..26284e2acbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class TestRMNodeLabelsManager extends NodeLabelTestBase { + private final Resource EMPTY_RESOURCE = Resource.newInstance(0, 0); + private final Resource SMALL_RESOURCE = Resource.newInstance(100, 0); + private final Resource LARGE_NODE = Resource.newInstance(1000, 0); + + DummyRMNodeLabelsManager mgr = null; + + @Before + public void before() { + mgr = new DummyRMNodeLabelsManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test(timeout = 5000) + public void testNodeActiveDeactiveUpdate() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); + + Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + EMPTY_RESOURCE); + + // active two NM to n1, one large and one small + mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Resources.add(SMALL_RESOURCE, LARGE_NODE)); + + // change the large NM to small, check if resource updated + mgr.updateNodeResource(NodeId.newInstance("n1", 2), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Resources.multiply(SMALL_RESOURCE, 2)); + + // deactive one NM, and check if resource updated + mgr.deactivateNode(NodeId.newInstance("n1", 1)); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); + + // continus deactive, check if resource updated + mgr.deactivateNode(NodeId.newInstance("n1", 2)); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + + // Add two NM to n1 back + mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE); + + // And remove p1, now the two NM should come to default label, + mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1")); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Resources.add(SMALL_RESOURCE, LARGE_NODE)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 5000) + public void testUpdateNodeLabelWithActiveNode() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); + + // active two NM to n1, one large and one small + mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE); + + // change label of n1 to p2 + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2"))); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), SMALL_RESOURCE); + + // add more labels + mgr.addToCluserNodeLabels(toSet("p4", "p5", "p6")); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p1"), + toNodeId("n5"), toSet("p2"), toNodeId("n6"), toSet("p3"), + toNodeId("n7"), toSet("p4"), toNodeId("n8"), toSet("p5"))); + + // now node -> label is, + // p1 : n4 + // p2 : n1, n2, n5 + // p3 : n3, n6 + // p4 : n7 + // p5 : n8 + // no-label : n9 + + // active these nodes + mgr.activateNode(NodeId.newInstance("n4", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n5", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n6", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n7", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n8", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n9", 1), SMALL_RESOURCE); + + // check varibles + Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Resources.multiply(SMALL_RESOURCE, 3)); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Resources.multiply(SMALL_RESOURCE, 1)); + Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Resources.multiply(SMALL_RESOURCE, 1)); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Resources.multiply(SMALL_RESOURCE, 1)); + + // change a bunch of nodes -> labels + // n4 -> p2 + // n7 -> empty + // n5 -> p1 + // n8 -> empty + // n9 -> p1 + // + // now become: + // p1 : n5, n9 + // p2 : n1, n2, n4 + // p3 : n3, n6 + // p4 : [ ] + // p5 : [ ] + // no label: n8, n7 + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p2"), + toNodeId("n7"), RMNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n5"), + toSet("p1"), toNodeId("n8"), RMNodeLabelsManager.EMPTY_STRING_SET, + toNodeId("n9"), toSet("p1"))); + + // check varibles + Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Resources.multiply(SMALL_RESOURCE, 3)); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Resources.multiply(SMALL_RESOURCE, 0)); + Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Resources.multiply(SMALL_RESOURCE, 0)); + Assert.assertEquals(mgr.getResourceByLabel("", null), + Resources.multiply(SMALL_RESOURCE, 2)); + } + + @Test(timeout=5000) + public void testGetQueueResource() throws Exception { + Resource clusterResource = Resource.newInstance(9999, 1); + + /* + * Node->Labels: + * host1 : red, blue + * host2 : blue, yellow + * host3 : yellow + * host4 : + */ + mgr.addToCluserNodeLabels(toSet("red", "blue", "yellow")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host1"), + toSet("red", "blue"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host2"), + toSet("blue", "yellow"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host3"), toSet("yellow"))); + + // active two NM to n1, one large and one small + mgr.activateNode(NodeId.newInstance("host1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host2", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host4", 1), SMALL_RESOURCE); + + // reinitialize queue + Set q1Label = toSet("red", "blue"); + Set q2Label = toSet("blue", "yellow"); + Set q3Label = toSet("yellow"); + Set q4Label = RMNodeLabelsManager.EMPTY_STRING_SET; + Set q5Label = toSet(RMNodeLabelsManager.ANY); + + Map> queueToLabels = new HashMap>(); + queueToLabels.put("Q1", q1Label); + queueToLabels.put("Q2", q2Label); + queueToLabels.put("Q3", q3Label); + queueToLabels.put("Q4", q4Label); + queueToLabels.put("Q5", q5Label); + + mgr.reinitializeQueueLabels(queueToLabels); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("host1"), toSet("red"), + toNodeId("host2"), toSet("blue", "yellow"))); + mgr.addLabelsToNode(ImmutableMap.of(toNodeId("host3"), toSet("red"))); + /* + * Check resource after changes some labels + * Node->Labels: + * host1 : blue (was: red, blue) + * host2 : (was: blue, yellow) + * host3 : red, yellow (was: yellow) + * host4 : + */ + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Check resource after deactive/active some nodes + * Node->Labels: + * (deactived) host1 : blue + * host2 : + * (deactived and then actived) host3 : red, yellow + * host4 : + */ + mgr.deactivateNode(NodeId.newInstance("host1", 1)); + mgr.deactivateNode(NodeId.newInstance("host3", 1)); + mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Check resource after refresh queue: + * Q1: blue + * Q2: red, blue + * Q3: red + * Q4: + * Q5: ANY + */ + q1Label = toSet("blue"); + q2Label = toSet("blue", "red"); + q3Label = toSet("red"); + q4Label = RMNodeLabelsManager.EMPTY_STRING_SET; + q5Label = toSet(RMNodeLabelsManager.ANY); + + queueToLabels.clear(); + queueToLabels.put("Q1", q1Label); + queueToLabels.put("Q2", q2Label); + queueToLabels.put("Q3", q3Label); + queueToLabels.put("Q4", q4Label); + queueToLabels.put("Q5", q5Label); + + mgr.reinitializeQueueLabels(queueToLabels); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Active NMs in nodes already have NM + * Node->Labels: + * host2 : + * host3 : red, yellow (3 NMs) + * host4 : (2 NMs) + */ + mgr.activateNode(NodeId.newInstance("host3", 2), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host3", 3), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host4", 2), SMALL_RESOURCE); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Deactive NMs in nodes already have NMs + * Node->Labels: + * host2 : + * host3 : red, yellow (2 NMs) + * host4 : (0 NMs) + */ + mgr.deactivateNode(NodeId.newInstance("host3", 3)); + mgr.deactivateNode(NodeId.newInstance("host4", 2)); + mgr.deactivateNode(NodeId.newInstance("host4", 1)); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + } +}