diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b22777cb98c..07b63392744 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -175,6 +175,9 @@ Release 2.8.0 - UNRELEASED YARN-4055. Report node resource utilization in heartbeat. (Inigo Goiri via kasha) + YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed + Node Label Configuration Setup. (Naganarasimha G R) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6c438f28372..55eac85e2a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1967,6 +1967,36 @@ public class YarnConfiguration extends Configuration { NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); } + private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX + + "node-labels."; + + public static final String NM_NODE_LABELS_PROVIDER_CONFIG = + NM_NODE_LABELS_PREFIX + "provider"; + + // whitelist names for the yarn.nodemanager.node-labels.provider + public static final String CONFIG_NODE_LABELS_PROVIDER = "config"; + + private static final String NM_NODE_LABELS_PROVIDER_PREFIX = + NM_NODE_LABELS_PREFIX + "provider."; + + // If -1 is configured then no timer task should be created + public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms"; + + public static final String NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS = + NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-timeout-ms"; + + // once in 10 mins + public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + 10 * 60 * 1000; + + // Twice of default interval time + public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS = + DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS * 2; + + public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS = + NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index 34e68327343..8cc37705206 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -916,7 +916,7 @@ public class CommonNodeLabelsManager extends AbstractService { } } - private void checkAndThrowLabelName(String label) throws IOException { + public static void checkAndThrowLabelName(String label) throws IOException { if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { throw new IOException("label added is empty or exceeds " + MAX_LABEL_LENGTH + " character(s)"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 53face0a3c5..00a9fbaa764 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2092,6 +2092,53 @@ centralized + + + + When node labels "yarn.node-labels.configuration-type" is + of type "distributed" Administrators can configure the source of the + node labels provider by configuring this parameter. Administrators can + specify either "config" or the class name of the provider. Configured + class needs to extend + org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider. + If "config" is specified then, "ConfigurationNodeLabelsProvider" will + be used. + + yarn.nodemanager.node-labels.provider + + + + + When node labels "yarn.nodemanager.node-labels.provider" is of type + "config" or the configured class extends AbstractNodeLabelsProvider then + periodically node labels are retrieved from the node labels provider. + This configuration is to define the interval. If -1 is configured then + node labels are retrieved from. provider only during initialization. + Defaults to 10 mins. + + yarn.nodemanager.node-labels.provider.fetch-interval-ms + 600000 + + + + + When node labels "yarn.nodemanager.node-labels.provider" + is of type "config" then ConfigurationNodeLabelsProvider fetches the + labels this parameter. + + yarn.nodemanager.node-labels.provider.configured-node-labels + + + + + When node labels "yarn.nodemanager.node-labels.provider" is a class + which extends AbstractNodeLabelsProvider then this configuration provides + the timeout period after which it will stop querying the Node labels + provider. Defaults to 20 mins. + + yarn.nodemanager.node-labels.provider.fetch-timeout-ms + 1200000 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 327171b1198..68820a7b6d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -122,12 +123,38 @@ public class NodeManager extends CompositeService metrics, nodeLabelsProvider); } - @VisibleForTesting - protected NodeLabelsProvider createNodeLabelsProvider( - Configuration conf) throws IOException { - // TODO as part of YARN-2729 - // Need to get the implementation of provider service and return - return null; + protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) + throws IOException { + NodeLabelsProvider provider = null; + String providerString = + conf.get(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null); + if (providerString == null || providerString.trim().length() == 0) { + // Seems like Distributed Node Labels configuration is not enabled + return provider; + } + switch (providerString.trim().toLowerCase()) { + case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER: + provider = new ConfigurationNodeLabelsProvider(); + break; + default: + try { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null, + NodeLabelsProvider.class); + provider = labelsProviderClass.newInstance(); + } catch (InstantiationException | IllegalAccessException + | RuntimeException e) { + LOG.error("Failed to create NodeLabelsProvider based on Configuration", + e); + throw new IOException("Failed to create NodeLabelsProvider : " + + e.getMessage(), e); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Distributed Node Labels is enabled" + + " with provider class as : " + provider.getClass().toString()); + } + return provider; } protected NodeResourceMonitor createNodeResourceMonitor() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0680ea388ec..05efc694d20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -137,8 +137,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private boolean registeredWithRM = false; Set pendingContainersToRemove = new HashSet(); - private final NodeLabelsProvider nodeLabelsProvider; - private final boolean hasNodeLabelsProvider; + private NMNodeLabelsHandler nodeLabelsHandler; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -150,8 +149,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements NodeLabelsProvider nodeLabelsProvider) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; - this.nodeLabelsProvider = nodeLabelsProvider; - this.hasNodeLabelsProvider = (nodeLabelsProvider != null); + nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider); this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; @@ -313,13 +311,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements protected void registerWithRM() throws YarnException, IOException { List containerReports = getNMContainerStatuses(); - Set nodeLabels = null; - if (hasNodeLabelsProvider) { - nodeLabels = nodeLabelsProvider.getNodeLabels(); - nodeLabels = - (null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET - : nodeLabels; - } + Set nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), @@ -380,14 +372,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements .append(this.nodeId).append(" with total resource of ") .append(this.totalResource); - if (regNMResponse.getAreNodeLabelsAcceptedByRM()) { - successfullRegistrationMsg - .append(" and with following Node label(s) : {") - .append(StringUtils.join(",", nodeLabels)).append("}"); - } else if (hasNodeLabelsProvider) { - //case where provider is set but RM did not accept the Node Labels - LOG.error(regNMResponse.getDiagnosticsMessage()); - } + successfullRegistrationMsg.append(nodeLabelsHandler + .verifyRMRegistrationResponseForNodeLabels(regNMResponse)); LOG.info(successfullRegistrationMsg); LOG.info("Notifying ContainerManager to unblock new container-requests"); @@ -688,33 +674,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @SuppressWarnings("unchecked") public void run() { int lastHeartbeatID = 0; - Set lastUpdatedNodeLabelsToRM = null; - if (hasNodeLabelsProvider) { - lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels(); - lastUpdatedNodeLabelsToRM = - (null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET - : lastUpdatedNodeLabelsToRM; - } while (!isStopped) { // Send heartbeat try { NodeHeartbeatResponse response = null; - Set nodeLabelsForHeartbeat = null; + Set nodeLabelsForHeartbeat = + nodeLabelsHandler.getNodeLabelsForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); - if (hasNodeLabelsProvider) { - nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels(); - // if the provider returns null then consider empty labels are set - nodeLabelsForHeartbeat = - (nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET - : nodeLabelsForHeartbeat; - if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat, - lastUpdatedNodeLabelsToRM)) { - // if nodelabels have not changed then no need to send - nodeLabelsForHeartbeat = null; - } - } - NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, NodeStatusUpdaterImpl.this.context @@ -740,9 +707,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements updateMasterKeys(response); if (response.getNodeAction() == NodeAction.SHUTDOWN) { - LOG - .warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," - + " hence shutting down."); + LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" + + " heartbeat, hence shutting down."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); context.setDecommissioned(true); @@ -764,16 +730,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements break; } - if (response.getAreNodeLabelsAcceptedByRM()) { - lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat; - LOG.info("Node Labels {" - + StringUtils.join(",", nodeLabelsForHeartbeat) - + "} were Accepted by RM "); - } else if (nodeLabelsForHeartbeat != null) { - // case where NodeLabelsProvider is set and updated labels were - // sent to RM and RM rejected the labels - LOG.error(response.getDiagnosticsMessage()); - } + nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response); // Explicitly put this method after checking the resync response. We // don't want to remove the completed containers before resync @@ -833,23 +790,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - /** - * Caller should take care of sending non null nodelabels for both - * arguments - * - * @param nodeLabelsNew - * @param nodeLabelsOld - * @return if the New node labels are diff from the older one. - */ - private boolean areNodeLabelsUpdated(Set nodeLabelsNew, - Set nodeLabelsOld) { - if (nodeLabelsNew.size() != nodeLabelsOld.size() - || !nodeLabelsOld.containsAll(nodeLabelsNew)) { - return true; - } - return false; - } - private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); @@ -879,4 +819,183 @@ public class NodeStatusUpdaterImpl extends AbstractService implements reports.addAll(logAggregationReportForAppsTempList); return reports; } + + private NMNodeLabelsHandler createNMNodeLabelsHandler( + NodeLabelsProvider nodeLabelsProvider) { + if (nodeLabelsProvider == null) { + return new NMCentralizedNodeLabelsHandler(); + } else { + return new NMDistributedNodeLabelsHandler(nodeLabelsProvider); + } + } + + private static interface NMNodeLabelsHandler { + /** + * validates nodeLabels From Provider and returns it to the caller. Also + * ensures that if provider returns null then empty label set is considered + */ + Set getNodeLabelsForRegistration(); + + /** + * @return RMRegistration Success message and on failure will log + * independently and returns empty string + */ + String verifyRMRegistrationResponseForNodeLabels( + RegisterNodeManagerResponse regNMResponse); + + /** + * If nodeLabels From Provider is different previous node labels then it + * will check the syntax correctness and throws exception if invalid. If + * valid, returns nodeLabels From Provider. Also ensures that if provider + * returns null then empty label set is considered and If labels are not + * modified it returns null. + */ + Set getNodeLabelsForHeartbeat(); + + /** + * check whether if updated labels sent to RM was accepted or not + * @param response + */ + void verifyRMHeartbeatResponseForNodeLabels(NodeHeartbeatResponse response); + } + + /** + * In centralized configuration, NM need not send Node labels or process the + * response + */ + private static class NMCentralizedNodeLabelsHandler + implements NMNodeLabelsHandler { + @Override + public Set getNodeLabelsForHeartbeat() { + return null; + } + + @Override + public Set getNodeLabelsForRegistration() { + return null; + } + + @Override + public void verifyRMHeartbeatResponseForNodeLabels( + NodeHeartbeatResponse response) { + } + + @Override + public String verifyRMRegistrationResponseForNodeLabels( + RegisterNodeManagerResponse regNMResponse) { + return ""; + } + } + + private static class NMDistributedNodeLabelsHandler + implements NMNodeLabelsHandler { + private NMDistributedNodeLabelsHandler( + NodeLabelsProvider nodeLabelsProvider) { + this.nodeLabelsProvider = nodeLabelsProvider; + } + + private final NodeLabelsProvider nodeLabelsProvider; + private Set previousNodeLabels; + private boolean updatedLabelsSentToRM; + + @Override + public Set getNodeLabelsForRegistration() { + Set nodeLabels = nodeLabelsProvider.getNodeLabels(); + nodeLabels = (null == nodeLabels) + ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels; + previousNodeLabels = nodeLabels; + try { + validateNodeLabels(nodeLabels); + } catch (IOException e) { + nodeLabels = null; + } + return nodeLabels; + } + + @Override + public String verifyRMRegistrationResponseForNodeLabels( + RegisterNodeManagerResponse regNMResponse) { + StringBuilder successfulNodeLabelsRegistrationMsg = new StringBuilder(""); + if (regNMResponse.getAreNodeLabelsAcceptedByRM()) { + successfulNodeLabelsRegistrationMsg + .append(" and with following Node label(s) : {") + .append(StringUtils.join(",", previousNodeLabels)).append("}"); + } else { + // case where provider is set but RM did not accept the Node Labels + LOG.error(regNMResponse.getDiagnosticsMessage()); + } + return successfulNodeLabelsRegistrationMsg.toString(); + } + + @Override + public Set getNodeLabelsForHeartbeat() { + Set nodeLabelsForHeartbeat = + nodeLabelsProvider.getNodeLabels(); + // if the provider returns null then consider empty labels are set + nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null) + ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET + : nodeLabelsForHeartbeat; + // take some action only on modification of labels + boolean areNodeLabelsUpdated = + nodeLabelsForHeartbeat.size() != previousNodeLabels.size() + || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat); + + updatedLabelsSentToRM = false; + if (areNodeLabelsUpdated) { + previousNodeLabels = nodeLabelsForHeartbeat; + try { + validateNodeLabels(nodeLabelsForHeartbeat); + updatedLabelsSentToRM = true; + } catch (IOException e) { + // set previous node labels to invalid set, so that invalid + // labels are not verified for every HB, and send empty set + // to RM to have same nodeLabels which was earlier set. + nodeLabelsForHeartbeat = null; + } + } else { + // if nodelabels have not changed then no need to send + nodeLabelsForHeartbeat = null; + } + return nodeLabelsForHeartbeat; + } + + private void validateNodeLabels(Set nodeLabelsForHeartbeat) + throws IOException { + Iterator iterator = nodeLabelsForHeartbeat.iterator(); + boolean hasInvalidLabel = false; + StringBuilder errorMsg = new StringBuilder(""); + while (iterator.hasNext()) { + try { + CommonNodeLabelsManager + .checkAndThrowLabelName(iterator.next().getName()); + } catch (IOException e) { + errorMsg.append(e.getMessage()); + errorMsg.append(" , "); + hasInvalidLabel = true; + } + } + if (hasInvalidLabel) { + LOG.error("Invalid Node Label(s) from Provider : " + errorMsg); + throw new IOException(errorMsg.toString()); + } + } + + @Override + public void verifyRMHeartbeatResponseForNodeLabels( + NodeHeartbeatResponse response) { + if (updatedLabelsSentToRM) { + if (response.getAreNodeLabelsAcceptedByRM()) { + LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels) + + "} were Accepted by RM "); + } else { + // case where updated labels from NodeLabelsProvider is sent to RM and + // RM rejected the labels + LOG.error( + "NM node labels {" + StringUtils.join(",", previousNodeLabels) + + "} were not accepted by RM and message from RM : " + + response.getDiagnosticsMessage()); + } + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java new file mode 100644 index 00000000000..bbc67109060 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.util.HashSet; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Provides base implementation of NodeLabelsProvider with Timer and expects + * subclass to provide TimerTask which can fetch NodeLabels + */ +public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider { + public static final long DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER = -1; + + // Delay after which timer task are triggered to fetch NodeLabels + protected long intervalTime; + + // Timer used to schedule node labels fetching + protected Timer nodeLabelsScheduler; + + public static final String NODE_LABELS_SEPRATOR = ","; + + protected Lock readLock = null; + protected Lock writeLock = null; + + protected TimerTask timerTask; + + protected Set nodeLabels = + CommonNodeLabelsManager.EMPTY_NODELABEL_SET; + + @VisibleForTesting + long startTime = 0; + + public AbstractNodeLabelsProvider(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.intervalTime = + conf.getLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS); + + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + readLock = readWriteLock.readLock(); + writeLock = readWriteLock.writeLock(); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + timerTask = createTimerTask(); + if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) { + nodeLabelsScheduler = + new Timer("DistributedNodeLabelsRunner-Timer", true); + // Start the timer task and then periodically at the configured interval + // time. Illegal values for intervalTime is handled by timer api + nodeLabelsScheduler.scheduleAtFixedRate(timerTask, startTime, + intervalTime); + } + super.serviceStart(); + } + + /** + * terminate the timer + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + if (nodeLabelsScheduler != null) { + nodeLabelsScheduler.cancel(); + } + super.serviceStop(); + } + + /** + * @return Returns output from provider. + */ + @Override + public Set getNodeLabels() { + readLock.lock(); + try { + return nodeLabels; + } finally { + readLock.unlock(); + } + } + + protected void setNodeLabels(Set nodeLabelsSet) { + writeLock.lock(); + try { + nodeLabels = nodeLabelsSet; + } finally { + writeLock.unlock(); + } + } + + /** + * Used only by tests to access the timer task directly + * + * @return the timer task + */ + TimerTask getTimerTask() { + return timerTask; + } + + static Set convertToNodeLabelSet(Set nodeLabels) { + if (null == nodeLabels) { + return null; + } + Set labels = new HashSet(); + for (String label : nodeLabels) { + labels.add(NodeLabel.newInstance(label)); + } + return labels; + } + + public abstract TimerTask createTimerTask(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java new file mode 100644 index 00000000000..f549d1a2824 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.TimerTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Provides Node's Labels by constantly monitoring the configuration. + */ +public class ConfigurationNodeLabelsProvider extends AbstractNodeLabelsProvider { + + private static final Log LOG = LogFactory + .getLog(ConfigurationNodeLabelsProvider.class); + + public ConfigurationNodeLabelsProvider() { + super("Configuration Based NodeLabels Provider"); + } + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + // In case timer is not configured avoid calling timertask.run thus avoiding + // unnecessary creation of YarnConfiguration Object + updateNodeLabelsFromConfig(conf); + if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) { + startTime = new Date().getTime() + intervalTime; + } + } + + private void updateNodeLabelsFromConfig(Configuration conf) + throws IOException { + String confLabelString = + conf.get(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, null); + String[] nodeLabelsFromConfiguration = + (confLabelString == null || confLabelString.isEmpty()) ? new String[] {} + : StringUtils.getStrings(confLabelString); + setNodeLabels(convertToNodeLabelSet(new HashSet( + Arrays.asList(nodeLabelsFromConfiguration)))); + } + + private class ConfigurationMonitorTimerTask extends TimerTask { + @Override + public void run() { + try { + updateNodeLabelsFromConfig(new YarnConfiguration()); + } catch (Exception e) { + LOG.error("Failed to update node Labels from configuration.xml ", e); + } + } + } + + @Override + public TimerTask createTimerTask() { + return new ConfigurationMonitorTimerTask(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java index 20ef7128445..2d390ac998a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java @@ -22,8 +22,11 @@ import static org.junit.Assert.fail; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.junit.Assert; import org.junit.Test; public class TestNodeManager { @@ -53,5 +56,50 @@ public class TestNodeManager { nm.stop(); } } - + + @Test + public void testCreationOfNodeLabelsProviderService() + throws InterruptedException { + try { + NodeManager nodeManager = new NodeManager(); + Configuration conf = new Configuration(); + NodeLabelsProvider labelsProviderService = + nodeManager.createNodeLabelsProvider(conf); + Assert + .assertNull( + "LabelsProviderService should not be initialized in default configuration", + labelsProviderService); + + // With valid className + conf.set( + YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, + "org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider"); + labelsProviderService = nodeManager.createNodeLabelsProvider(conf); + Assert.assertNotNull("LabelsProviderService should be initialized When " + + "node labels provider class is configured", labelsProviderService); + + // With invalid className + conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, + "org.apache.hadoop.yarn.server.nodemanager.NodeManager"); + try { + labelsProviderService = nodeManager.createNodeLabelsProvider(conf); + Assert.fail("Expected to throw IOException on Invalid configuration"); + } catch (IOException e) { + // exception expected on invalid configuration + } + Assert.assertNotNull("LabelsProviderService should be initialized When " + + "node labels provider class is configured", labelsProviderService); + + // With valid whitelisted configurations + conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, + YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER); + labelsProviderService = nodeManager.createNodeLabelsProvider(conf); + Assert.assertNotNull("LabelsProviderService should be initialized When " + + "node labels provider class is configured", labelsProviderService); + + } catch (Exception e) { + Assert.fail("Exception caught"); + e.printStackTrace(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java index 7e1bbd85771..099e4b42d82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -108,7 +109,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { if (receivedNMHeartbeat) { return; } - int i = 500; + int i = 10; while (!receivedNMHeartbeat && i > 0) { synchronized (ResourceTrackerForLabels.class) { if (!receivedNMHeartbeat) { @@ -193,7 +194,6 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { public static class DummyNodeLabelsProvider extends NodeLabelsProvider { - @SuppressWarnings("unchecked") private Set nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET; public DummyNodeLabelsProvider() { @@ -224,8 +224,8 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { new ResourceTrackerForLabels(); nm = new NodeManager() { @Override - protected NodeLabelsProvider createNodeLabelsProvider( - Configuration conf) throws IOException { + protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) + throws IOException { return dummyLabelsProviderRef; } @@ -255,8 +255,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { nm.start(); resourceTracker.waitTillRegister(); assertNLCollectionEquals(resourceTracker.labels, - dummyLabelsProviderRef - .getNodeLabels()); + dummyLabelsProviderRef.getNodeLabels()); resourceTracker.waitTillHeartbeat();// wait till the first heartbeat resourceTracker.resetNMHeartbeatReceiveFlag(); @@ -278,15 +277,76 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { assertNull( "If no change in labels then null should be sent as part of request", resourceTracker.labels); - + // provider return with null labels - dummyLabelsProviderRef.setNodeLabels(null); + dummyLabelsProviderRef.setNodeLabels(null); nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); + assertNotNull( + "If provider sends null then empty label set should be sent and not null", + resourceTracker.labels); assertTrue("If provider sends null then empty labels should be sent", resourceTracker.labels.isEmpty()); resourceTracker.resetNMHeartbeatReceiveFlag(); nm.stop(); } + + @Test + public void testInvalidNodeLabelsFromProvider() throws InterruptedException, + IOException { + final ResourceTrackerForLabels resourceTracker = + new ResourceTrackerForLabels(); + nm = new NodeManager() { + @Override + protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) + throws IOException { + return dummyLabelsProviderRef; + } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProvider labelsProvider) { + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics, labelsProvider) { + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected void stopRMProxy() { + return; + } + }; + } + }; + dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P")); + YarnConfiguration conf = createNMConfigForDistributeNodeLabels(); + nm.init(conf); + resourceTracker.resetNMHeartbeatReceiveFlag(); + nm.start(); + resourceTracker.waitTillHeartbeat();// wait till the first heartbeat + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat with invalid labels + dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("_.P")); + + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertNull("On Invalid Labels we need to retain earlier labels, HB " + + "needs to send null", resourceTracker.labels); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // on next heartbeat same invalid labels will be given by the provider, but + // again label validation check and reset RM with empty labels set should + // not happen + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + resourceTracker.resetNMHeartbeatReceiveFlag(); + assertNull("NodeStatusUpdater need not send repeatedly empty labels on " + + "invalid labels from provider ", resourceTracker.labels); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java new file mode 100644 index 00000000000..27fd4cb3e86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.TimerTask; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase { + + protected static File testRootDir = new File("target", + TestConfigurationNodeLabelsProvider.class.getName() + "-localDir") + .getAbsoluteFile(); + + final static File nodeLabelsConfigFile = new File(testRootDir, + "yarn-site.xml"); + + private static XMLPathClassLoader loader; + + private ConfigurationNodeLabelsProvider nodeLabelsProvider; + + @Before + public void setup() { + loader = + new XMLPathClassLoader( + TestConfigurationNodeLabelsProvider.class.getClassLoader()); + testRootDir.mkdirs(); + + nodeLabelsProvider = new ConfigurationNodeLabelsProvider(); + } + + @After + public void tearDown() throws Exception { + if (nodeLabelsProvider != null) { + nodeLabelsProvider.close(); + } + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + } + + private Configuration getConfForNodeLabels() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, "A,B,CX"); + return conf; + } + + @Test + public void testNodeLabelsFromConfig() throws IOException, + InterruptedException { + Configuration conf = getConfForNodeLabels(); + nodeLabelsProvider.init(conf); + // test for ensuring labels are set during initialization of the class + nodeLabelsProvider.start(); + Thread.sleep(1000l); // sleep so that timer has run once during + // initialization + assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"), + nodeLabelsProvider.getNodeLabels()); + + // test for valid Modification + TimerTask timerTask = nodeLabelsProvider.getTimerTask(); + modifyConfAndCallTimer(timerTask, "X,y,Z"); + assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"), + nodeLabelsProvider.getNodeLabels()); + } + + @Test + public void testConfigForNoTimer() throws Exception { + Configuration conf = getConfForNodeLabels(); + conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER); + + nodeLabelsProvider.init(conf); + nodeLabelsProvider.start(); + Assert + .assertNull( + "Timer is not expected to be created when interval is configured as -1", + nodeLabelsProvider.nodeLabelsScheduler); + // Ensure that even though timer is not run, node labels are fetched at least once so + // that NM registers/updates Labels with RM + assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"), + nodeLabelsProvider.getNodeLabels()); + } + + private static void modifyConfAndCallTimer(TimerTask timerTask, + String nodeLabels) throws FileNotFoundException, IOException { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels); + conf.writeXml(new FileOutputStream(nodeLabelsConfigFile)); + ClassLoader actualLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(loader); + timerTask.run(); + } finally { + Thread.currentThread().setContextClassLoader(actualLoader); + } + } + + private static class XMLPathClassLoader extends ClassLoader { + public XMLPathClassLoader(ClassLoader wrapper) { + super(wrapper); + } + + public URL getResource(String name) { + if (name.equals(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)) { + try { + return nodeLabelsConfigFile.toURI().toURL(); + } catch (MalformedURLException e) { + e.printStackTrace(); + Assert.fail(); + } + } + return super.getResource(name); + } + } +}