diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5bae5f49ad8..5ca99983c0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3548,9 +3548,12 @@ public class YarnConfiguration extends Configuration { public static final String NM_NODE_LABELS_PROVIDER_CONFIG = NM_NODE_LABELS_PREFIX + "provider"; + public static final String NM_NODE_ATTRIBUTES_PROVIDER_CONFIG = + NM_NODE_ATTRIBUTES_PREFIX + "provider"; + // whitelist names for the yarn.nodemanager.node-labels.provider - public static final String CONFIG_NODE_LABELS_PROVIDER = "config"; - public static final String SCRIPT_NODE_LABELS_PROVIDER = "script"; + public static final String CONFIG_NODE_DESCRIPTOR_PROVIDER = "config"; + public static final String SCRIPT_NODE_DESCRIPTOR_PROVIDER = "script"; private static final String NM_NODE_LABELS_PROVIDER_PREFIX = NM_NODE_LABELS_PREFIX + "provider."; @@ -3582,6 +3585,9 @@ public class YarnConfiguration extends Configuration { public static final String NM_PROVIDER_CONFIGURED_NODE_PARTITION = NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-partition"; + public static final String NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES = + NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "configured-node-attributes"; + private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX + "node-labels."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java index effda9b6703..ffa33cfe962 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java @@ -35,15 +35,18 @@ public abstract class NodeAttributesManager extends AbstractService { /** * To completely replace the mappings for a given node with the new Set of - * Attributes. If the mapping contains an attribute whose type does not match - * a previously existing Attribute under the same prefix (name space) then - * exception is thrown. Key would be name of the node and value would be set - * of Attributes to be mapped. + * Attributes which are under a given prefix. If the mapping contains an + * attribute whose type does not match a previously existing Attribute + * under the same prefix (name space) then exception is thrown. + * Key would be name of the node and value would be set of Attributes to + * be mapped. If the prefix is null, then all node attributes will be + * replaced regardless of what prefix they have. * - * @param nodeAttributeMapping - * @throws IOException + * @param prefix node attribute prefix + * @param nodeAttributeMapping host name to a set of node attributes mapping + * @throws IOException if failed to replace attributes */ - public abstract void replaceNodeAttributes( + public abstract void replaceNodeAttributes(String prefix, Map> nodeAttributeMapping) throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java index fdfd0ce7033..93a27a9ebfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import java.io.IOException; import java.util.Set; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Utility class for all NodeLabel and NodeAttribute operations. @@ -125,4 +126,22 @@ public final class NodeLabelUtil { } } } + + /** + * Filter a set of node attributes by a given prefix. Returns a filtered + * set of node attributes whose prefix equals the given prefix. + * If the prefix is null or empty, then the original set is returned. + * @param attributeSet node attribute set + * @param prefix node attribute prefix + * @return a filtered set of node attributes + */ + public static Set filterAttributesByPrefix( + Set attributeSet, String prefix) { + if (Strings.isNullOrEmpty(prefix)) { + return attributeSet; + } + return attributeSet.stream().filter( + nodeAttribute -> prefix.equals(nodeAttribute.getAttributePrefix())) + .collect(Collectors.toSet()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 65b2a6d6161..cdc3c093303 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2902,6 +2902,20 @@ + + + This property determines which provider will be plugged by the + node manager to collect node-attributes. Administrators can + configure "config", "script" or the class name of the provider. + Configured class needs to extend + org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider. + If "config" is configured, then "ConfigurationNodeLabelsProvider" and if + "script" is configured, then "ScriptBasedNodeAttributesProvider" + will be used. + + yarn.nodemanager.node-attributes.provider + + The node attribute script NM runs to collect node attributes. @@ -2939,6 +2953,16 @@ 1200000 + + + When "yarn.nodemanager.node-attributes.provider" is configured with + "config" then ConfigurationNodeAttributesProvider fetches node attributes + from this parameter. + + yarn.nodemanager.node-attributes.provider.configured-node-attributes + + + Timeout in seconds for YARN node graceful decommission. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b54a6b7400e..6eda4a80b77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -66,6 +66,9 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeAttributesProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeAttributesProvider; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -123,6 +126,7 @@ public class NodeManager extends CompositeService private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; private NodeLabelsProvider nodeLabelsProvider; + private NodeAttributesProvider nodeAttributesProvider; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -162,14 +166,45 @@ public class NodeManager extends CompositeService protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, nodeLabelsProvider); + metrics); } - protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker, - NodeLabelsProvider nodeLabelsProvider) { - return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, nodeLabelsProvider); + protected NodeAttributesProvider createNodeAttributesProvider( + Configuration conf) throws IOException { + NodeAttributesProvider attributesProvider = null; + String providerString = + conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null); + if (providerString == null || providerString.trim().length() == 0) { + return attributesProvider; + } + switch (providerString.trim().toLowerCase()) { + case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER: + attributesProvider = new ConfigurationNodeAttributesProvider(); + break; + case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER: + attributesProvider = new ScriptBasedNodeAttributesProvider(); + break; + default: + try { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, + null, NodeAttributesProvider.class); + attributesProvider = labelsProviderClass.newInstance(); + } catch (InstantiationException | IllegalAccessException + | RuntimeException e) { + LOG.error("Failed to create NodeAttributesProvider" + + " based on Configuration", e); + throw new IOException( + "Failed to create NodeAttributesProvider : " + + e.getMessage(), e); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Distributed Node Attributes is enabled" + + " with provider class as : " + + attributesProvider.getClass().toString()); + } + return attributesProvider; } protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) @@ -182,10 +217,10 @@ public class NodeManager extends CompositeService return provider; } switch (providerString.trim().toLowerCase()) { - case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER: + case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER: provider = new ConfigurationNodeLabelsProvider(); break; - case YarnConfiguration.SCRIPT_NODE_LABELS_PROVIDER: + case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER: provider = new ScriptBasedNodeLabelsProvider(); break; default: @@ -407,16 +442,19 @@ public class NodeManager extends CompositeService ((NMContext)context).setContainerExecutor(exec); ((NMContext)context).setDeletionService(del); - nodeLabelsProvider = createNodeLabelsProvider(conf); + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); - if (null == nodeLabelsProvider) { - nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); - } else { + nodeLabelsProvider = createNodeLabelsProvider(conf); + if (nodeLabelsProvider != null) { addIfService(nodeLabelsProvider); - nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, - nodeLabelsProvider); + nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider); + } + + nodeAttributesProvider = createNodeAttributesProvider(conf); + if (nodeAttributesProvider != null) { + addIfService(nodeAttributesProvider); + nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider); } nodeResourceMonitor = createNodeResourceMonitor(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 08892d20799..142cbbc9cbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; public interface NodeStatusUpdater extends Service { @@ -59,4 +61,16 @@ public interface NodeStatusUpdater extends Service { * @param ex exception that makes the node unhealthy */ void reportException(Exception ex); + + /** + * Sets a node attributes provider to node manager. + * @param provider + */ + void setNodeAttributesProvider(NodeAttributesProvider provider); + + /** + * Sets a node labels provider to the node manager. + * @param provider + */ + void setNodeLabelsProvider(NodeLabelsProvider provider); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 7be9ef72d26..df76ed715dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -85,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; @@ -152,21 +154,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Set pendingContainersToRemove = new HashSet(); private NMNodeLabelsHandler nodeLabelsHandler; - private final NodeLabelsProvider nodeLabelsProvider; + private NMNodeAttributesHandler nodeAttributesHandler; + private NodeLabelsProvider nodeLabelsProvider; + private NodeAttributesProvider nodeAttributesProvider; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - this(context, dispatcher, healthChecker, metrics, null); - } - - public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, - NodeLabelsProvider nodeLabelsProvider) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; this.context = context; this.dispatcher = dispatcher; - this.nodeLabelsProvider = nodeLabelsProvider; this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); this.pendingCompletedContainers = @@ -175,6 +172,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new ArrayList(); } + @Override + public void setNodeAttributesProvider(NodeAttributesProvider provider) { + this.nodeAttributesProvider = provider; + } + + @Override + public void setNodeLabelsProvider(NodeLabelsProvider provider) { + this.nodeLabelsProvider = provider; + } + @Override protected void serviceInit(Configuration conf) throws Exception { this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf); @@ -214,7 +221,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION); - nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider); + nodeLabelsHandler = + createNMNodeLabelsHandler(nodeLabelsProvider); + nodeAttributesHandler = + createNMNodeAttributesHandler(nodeAttributesProvider); + // Default duration to track stopped containers on nodemanager is 10Min. // This should not be assigned very large value as it will remember all the // containers stopped during that time. @@ -856,6 +867,43 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + /** + * Returns a handler based on the configured node attributes provider. + * returns null if no provider is configured. + * @param provider + * @return attributes handler + */ + private NMNodeAttributesHandler createNMNodeAttributesHandler( + NodeAttributesProvider provider) { + return provider == null ? null : + new NMDistributedNodeAttributesHandler(nodeAttributesProvider); + } + + private interface NMNodeAttributesHandler { + + /** + * @return the node attributes of this node manager. + */ + Set getNodeAttributesForHeartbeat(); + } + + private static class NMDistributedNodeAttributesHandler + implements NMNodeAttributesHandler { + + private final NodeAttributesProvider attributesProvider; + + protected NMDistributedNodeAttributesHandler( + NodeAttributesProvider provider) { + this.attributesProvider = provider; + } + + @Override + public Set getNodeAttributesForHeartbeat() { + return attributesProvider.getDescriptors(); + } + } + + private static interface NMNodeLabelsHandler { /** * validates nodeLabels From Provider and returns it to the caller. Also @@ -1071,6 +1119,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements NodeHeartbeatResponse response = null; Set nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); + Set nodeAttributesForHeartbeat = + nodeAttributesHandler == null ? null : + nodeAttributesHandler.getNodeAttributesForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, @@ -1079,6 +1130,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey(), nodeLabelsForHeartbeat, + nodeAttributesForHeartbeat, NodeStatusUpdaterImpl.this.context .getRegisteringCollectors()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java new file mode 100644 index 00000000000..74341eb7d7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.TimerTask; +import java.util.Set; + +/** + * Configuration based node attributes provider. + */ +public class ConfigurationNodeAttributesProvider + extends NodeAttributesProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class); + + public ConfigurationNodeAttributesProvider() { + super("Configuration Based Node Attributes Provider"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + long taskInterval = conf.getLong(YarnConfiguration + .NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, + YarnConfiguration + .DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS); + this.setIntervalTime(taskInterval); + super.serviceInit(conf); + } + + private void updateNodeAttributesFromConfig(Configuration conf) + throws IOException { + String configuredNodeAttributes = conf.get( + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, null); + setDescriptors(parseAttributes(configuredNodeAttributes)); + } + + // TODO parse attributes from configuration + @VisibleForTesting + public Set parseAttributes(String config) + throws IOException { + return new HashSet<>(); + } + + private class ConfigurationMonitorTimerTask extends TimerTask { + @Override + public void run() { + try { + updateNodeAttributesFromConfig(new YarnConfiguration()); + } catch (Exception e) { + LOG.error("Failed to update node attributes from " + + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, e); + } + } + } + + @Override + protected void cleanUp() throws Exception { + // Nothing to cleanup + } + + @Override + public TimerTask createTimerTask() { + return new ConfigurationMonitorTimerTask(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java index b31215b0f3d..b2c2f6ee9a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java @@ -160,7 +160,7 @@ public class TestNodeManager { // With valid whitelisted configurations conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, - YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER); + YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER); labelsProviderService = nodeManager.createNodeLabelsProvider(conf); Assert.assertNotNull("LabelsProviderService should be initialized When " + "node labels provider class is configured", labelsProviderService); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java index 7ef23cbbd1c..3e2d963e898 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -225,11 +225,10 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker, - NodeLabelsProvider labelsProvider) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, labelsProvider) { + metrics) { @Override protected ResourceTracker getRMClient() { return resourceTracker; @@ -325,11 +324,10 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker, - NodeLabelsProvider labelsProvider) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, labelsProvider) { + metrics) { @Override protected ResourceTracker getRMClient() { return resourceTracker; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java new file mode 100644 index 00000000000..54cc8f0fd0d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.BeforeClass; +import org.junit.Before; +import org.junit.AfterClass; +import org.junit.After; +import org.junit.Test; +import org.junit.Assert; + +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; + +/** + * Test class for node configuration node attributes provider. + */ +public class TestConfigurationNodeAttributesProvider { + + private static File testRootDir = new File("target", + TestConfigurationNodeAttributesProvider.class.getName() + "-localDir") + .getAbsoluteFile(); + + private ConfigurationNodeAttributesProvider nodeAttributesProvider; + + @BeforeClass + public static void create() { + testRootDir.mkdirs(); + } + + @Before + public void setup() { + nodeAttributesProvider = new ConfigurationNodeAttributesProvider(); + } + + @After + public void tearDown() throws Exception { + if (nodeAttributesProvider != null) { + nodeAttributesProvider.close(); + nodeAttributesProvider.stop(); + } + } + + @AfterClass + public static void remove() throws Exception { + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext() + .delete(new Path(testRootDir.getAbsolutePath()), true); + } + } + + @Test(timeout=30000L) + public void testNodeAttributesFetchInterval() + throws IOException, InterruptedException { + Set expectedAttributes1 = new HashSet<>(); + expectedAttributes1.add(NodeAttribute + .newInstance("test.io", "host", + NodeAttributeType.STRING, "host1")); + + Configuration conf = new Configuration(); + // Set fetch interval to 1s for testing + conf.setLong( + YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, 1000); + ConfigurationNodeAttributesProvider spyProvider = + Mockito.spy(nodeAttributesProvider); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes1); + + spyProvider.init(conf); + spyProvider.start(); + + // Verify init value is honored. + Assert.assertEquals(expectedAttributes1, spyProvider.getDescriptors()); + + // Configuration provider provides a different set of attributes. + Set expectedAttributes2 = new HashSet<>(); + expectedAttributes2.add(NodeAttribute + .newInstance("test.io", "os", + NodeAttributeType.STRING, "windows")); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes2); + + // Since we set fetch interval to 1s, it needs to wait for 1s until + // the updated attributes is updated to the provider. So we are expecting + // to see some old values for a short window. + ArrayList keysMet = new ArrayList<>(); + int numOfOldValue = 0; + int numOfNewValue = 0; + // Run 5 times in 500ms interval + int times=5; + while(times>0) { + Set current = spyProvider.getDescriptors(); + Assert.assertEquals(1, current.size()); + String attributeName = current.iterator().next().getAttributeName(); + if ("host".equals(attributeName)){ + numOfOldValue++; + } else if ("os".equals(attributeName)) { + numOfNewValue++; + } + Thread.sleep(500); + times--; + } + // We should either see the old value or the new value. + Assert.assertEquals(5, numOfNewValue + numOfOldValue); + // Both values should be more than 0. + Assert.assertTrue(numOfOldValue > 0); + Assert.assertTrue(numOfNewValue > 0); + } + + @Test + public void testDisableFetchNodeAttributes() throws IOException, + InterruptedException { + Set expectedAttributes1 = new HashSet<>(); + expectedAttributes1.add(NodeAttribute + .newInstance("test.io", "host", + NodeAttributeType.STRING, "host1")); + + Configuration conf = new Configuration(); + // Set fetch interval to -1 to disable refresh. + conf.setLong( + YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1); + ConfigurationNodeAttributesProvider spyProvider = + Mockito.spy(nodeAttributesProvider); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes1); + spyProvider.init(conf); + spyProvider.start(); + + Assert.assertEquals(expectedAttributes1, + spyProvider.getDescriptors()); + + // The configuration added another attribute, + // as we disabled the fetch interval, this value cannot be + // updated to the provider. + Set expectedAttributes2 = new HashSet<>(); + expectedAttributes2.add(NodeAttribute + .newInstance("test.io", "os", + NodeAttributeType.STRING, "windows")); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes2); + + // Wait a few seconds until we get the value update, expecting a failure. + try { + GenericTestUtils.waitFor(() -> { + Set attributes = spyProvider.getDescriptors(); + return "os".equalsIgnoreCase(attributes + .iterator().next().getAttributeName()); + }, 500, 1000); + } catch (Exception e) { + // Make sure we get the timeout exception. + Assert.assertTrue(e instanceof TimeoutException); + return; + } + + Assert.fail("Expecting a failure in previous check!"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index e9971928117..8a1a9a7ff04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.collections.CollectionUtils; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -646,6 +648,34 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getNodeManagerQueueLimitCalculator() .createContainerQueuingLimit()); } + + // 8. Get node's attributes and update node-to-attributes mapping + // in RMNodeAttributeManager. + Set nodeAttributes = request.getNodeAttributes(); + if (nodeAttributes != null && !nodeAttributes.isEmpty()) { + nodeAttributes.forEach(nodeAttribute -> + LOG.debug(nodeId.toString() + " ATTRIBUTE : " + + nodeAttribute.toString())); + + // Validate attributes + if (!nodeAttributes.stream().allMatch( + nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED + .equals(nodeAttribute.getAttributePrefix()))) { + // All attributes must be in same prefix: nm.yarn.io. + // Since we have the checks in NM to make sure attributes reported + // in HB are with correct prefix, so it should not reach here. + LOG.warn("Reject invalid node attributes from host: " + + nodeId.toString() + ", attributes in HB must have prefix " + + NodeAttribute.PREFIX_DISTRIBUTED); + } else { + // Replace all distributed node attributes associated with this host + // with the new reported attributes in node attribute manager. + this.rmContext.getNodeAttributesManager() + .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, + ImmutableMap.of(nodeId.getHost(), nodeAttributes)); + } + } + return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index a902ac68fdd..04d74a87f5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -126,7 +127,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { private void internalUpdateAttributesOnNodes( Map> nodeAttributeMapping, AttributeMappingOperationType op, - Map newAttributesToBeAdded) { + Map newAttributesToBeAdded, + String attributePrefix) { try { writeLock.lock(); @@ -156,8 +158,9 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { break; case REPLACE: clusterAttributes.putAll(newAttributesToBeAdded); - replaceNodeToAttribute(nodeHost, node.getAttributes(), attributes); - node.replaceAttributes(attributes); + replaceNodeToAttribute(nodeHost, attributePrefix, + node.getAttributes(), attributes); + node.replaceAttributes(attributes, attributePrefix); break; default: break; @@ -199,15 +202,23 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { private void addNodeToAttribute(String nodeHost, Map attributeMappings) { for (NodeAttribute attribute : attributeMappings.keySet()) { - clusterAttributes.get(attribute).addNode(nodeHost); + RMNodeAttribute rmNodeAttribute = clusterAttributes.get(attribute); + if (rmNodeAttribute != null) { + rmNodeAttribute.addNode(nodeHost); + } else { + clusterAttributes.put(attribute, new RMNodeAttribute(attribute)); + } } } - private void replaceNodeToAttribute(String nodeHost, + private void replaceNodeToAttribute(String nodeHost, String prefix, Map oldAttributeMappings, Map newAttributeMappings) { if (oldAttributeMappings != null) { - removeNodeFromAttributes(nodeHost, oldAttributeMappings.keySet()); + Set toRemoveAttributes = + NodeLabelUtil.filterAttributesByPrefix( + oldAttributeMappings.keySet(), prefix); + removeNodeFromAttributes(nodeHost, toRemoveAttributes); } addNodeToAttribute(nodeHost, newAttributeMappings); } @@ -432,8 +443,19 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { } public void replaceAttributes( - Map attributesMapping) { - this.attributes.clear(); + Map attributesMapping, String prefix) { + if (Strings.isNullOrEmpty(prefix)) { + this.attributes.clear(); + } else { + Iterator> it = + this.attributes.entrySet().iterator(); + while (it.hasNext()) { + Entry current = it.next(); + if (prefix.equals(current.getKey().getAttributePrefix())) { + it.remove(); + } + } + } this.attributes.putAll(attributesMapping); } @@ -506,9 +528,10 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { } @Override - public void replaceNodeAttributes( + public void replaceNodeAttributes(String prefix, Map> nodeAttributeMapping) throws IOException { - processMapping(nodeAttributeMapping, AttributeMappingOperationType.REPLACE); + processMapping(nodeAttributeMapping, + AttributeMappingOperationType.REPLACE, prefix); } @Override @@ -526,12 +549,19 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { private void processMapping( Map> nodeAttributeMapping, AttributeMappingOperationType mappingType) throws IOException { + processMapping(nodeAttributeMapping, mappingType, null); + } + + private void processMapping( + Map> nodeAttributeMapping, + AttributeMappingOperationType mappingType, String attributePrefix) + throws IOException { Map newAttributesToBeAdded = new HashMap<>(); Map> validMapping = validate(nodeAttributeMapping, newAttributesToBeAdded, false); internalUpdateAttributesOnNodes(validMapping, mappingType, - newAttributesToBeAdded); + newAttributesToBeAdded, attributePrefix); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index fa0f5fd7a46..a29e8a2bd07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -37,6 +37,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.xml.parsers.DocumentBuilderFactory; @@ -64,12 +65,16 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.nodelabels.AttributeValue; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -817,6 +822,79 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm.stop(); } + @Test + public void testNodeHeartbeatWithNodeAttributes() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + // Register to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + Set nodeAttributes = new HashSet<>(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host2")); + + // Set node attributes in HB. + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); + int responseId = nodeStatusObject.getResponseId(); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Ensure RM gets correct node attributes update. + NodeAttributesManager attributeManager = + rm.getRMContext().getNodeAttributesManager(); + Map attrs = attributeManager + .getAttributesForNode(nodeId.getHost()); + Assert.assertEquals(1, attrs.size()); + NodeAttribute na = attrs.keySet().iterator().next(); + Assert.assertEquals("host", na.getAttributeName()); + Assert.assertEquals("host2", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + + + // Send another HB to RM with updated node atrribute + nodeAttributes.clear(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host3")); + nodeStatusObject = getNodeStatusObject(nodeId); + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM gets the updated attribute + attrs = attributeManager.getAttributesForNode(nodeId.getHost()); + Assert.assertEquals(1, attrs.size()); + na = attrs.keySet().iterator().next(); + Assert.assertEquals("host", na.getAttributeName()); + Assert.assertEquals("host3", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + } + @Test public void testNodeHeartBeatWithInvalidLabels() throws Exception { writeToHostsFile("host2"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java index b639a741c65..07968d41ef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.nodelabels.AttributeValue; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.junit.Test; import org.junit.Before; import org.junit.After; @@ -255,4 +257,101 @@ public class TestNodeAttributesManager { .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0])); Assert.assertEquals(2, allAttributesPerPrefix.size()); } + + @Test + public void testReplaceNodeAttributes() throws IOException { + Map> toAddAttributes = new HashMap<>(); + Map> toReplaceMap = new HashMap<>(); + Map nodeAttributes; + Set filteredAttributes; + Set clusterAttributes; + + // Add 3 attributes to host1 + // yarn.test1.io/A1=host1_v1_1 + // yarn.test1.io/A2=host1_v1_2 + // yarn.test1.io/A3=host1_v1_3 + toAddAttributes.put(HOSTNAMES[0], + createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1")); + + attributesManager.addNodeAttributes(toAddAttributes); + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(3, nodeAttributes.size()); + + // Add 10 distributed node attributes to host1 + // nn.yarn.io/dist-node-attribute1=dist_v1_1 + // nn.yarn.io/dist-node-attribute2=dist_v1_2 + // ... + // nn.yarn.io/dist-node-attribute10=dist_v1_10 + toAddAttributes.clear(); + toAddAttributes.put(HOSTNAMES[0], + createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, + 10, "dist-node-attribute", "dist_v1")); + attributesManager.addNodeAttributes(toAddAttributes); + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(13, nodeAttributes.size()); + clusterAttributes = attributesManager.getClusterNodeAttributes( + Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0])); + Assert.assertEquals(13, clusterAttributes.size()); + + // Replace by prefix + // Same distributed attributes names, but different values. + Set toReplaceAttributes = + createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 5, + "dist-node-attribute", "dist_v2"); + + attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, + ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes)); + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(8, nodeAttributes.size()); + clusterAttributes = attributesManager.getClusterNodeAttributes( + Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0])); + Assert.assertEquals(8, clusterAttributes.size()); + + // Now we have 5 distributed attributes + filteredAttributes = NodeLabelUtil.filterAttributesByPrefix( + nodeAttributes.keySet(), NodeAttribute.PREFIX_DISTRIBUTED); + Assert.assertEquals(5, filteredAttributes.size()); + // Values are updated to have prefix dist_v2 + Assert.assertTrue(filteredAttributes.stream().allMatch( + nodeAttribute -> + nodeAttribute.getAttributeValue().startsWith("dist_v2"))); + + // We still have 3 yarn.test1.io attributes + filteredAttributes = NodeLabelUtil.filterAttributesByPrefix( + nodeAttributes.keySet(), PREFIXES[0]); + Assert.assertEquals(3, filteredAttributes.size()); + + // Replace with prefix + // Different attribute names + toReplaceAttributes = + createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 1, + "dist-node-attribute-v2", "dist_v3"); + attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, + ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes)); + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(4, nodeAttributes.size()); + clusterAttributes = attributesManager.getClusterNodeAttributes( + Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED)); + Assert.assertEquals(1, clusterAttributes.size()); + NodeAttribute att = clusterAttributes.iterator().next(); + Assert.assertEquals("dist-node-attribute-v2_0", att.getAttributeName()); + Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED, + att.getAttributePrefix()); + Assert.assertEquals("dist_v3_0", att.getAttributeValue()); + + // Replace all attributes + toReplaceMap.put(HOSTNAMES[0], + createAttributesForTest(PREFIXES[1], 2, "B", "B_v1")); + attributesManager.replaceNodeAttributes(null, toReplaceMap); + + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(2, nodeAttributes.size()); + clusterAttributes = attributesManager + .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1])); + Assert.assertEquals(2, clusterAttributes.size()); + clusterAttributes = attributesManager + .getClusterNodeAttributes(Sets.newHashSet( + NodeAttribute.PREFIX_DISTRIBUTED)); + Assert.assertEquals(0, clusterAttributes.size()); + } }