From 440ff7f563df5e7db72dce020c3c3dc379f88c91 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Sat, 31 Mar 2018 19:53:06 +0530 Subject: [PATCH] YARN-8094. Support configuration based Node Attribute provider. Contributed by Weiwei Yang. --- .../ConfigurationNodeAttributesProvider.java | 70 +++++++++++++++++- .../ScriptBasedNodeAttributesProvider.java | 8 ++ ...stConfigurationNodeAttributesProvider.java | 74 +++++++++++++++++++ 3 files changed, 150 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java index 74341eb7d7c..ab8a8b1cd0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java @@ -18,13 +18,19 @@ package org.apache.hadoop.yarn.server.nodemanager.nodelabels; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.EnumUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.TimerTask; import java.util.Set; @@ -38,6 +44,9 @@ public class ConfigurationNodeAttributesProvider private static final Logger LOG = LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class); + private static final String NODE_ATTRIBUTES_DELIMITER = ":"; + private static final String NODE_ATTRIBUTE_DELIMITER = ","; + public ConfigurationNodeAttributesProvider() { super("Configuration Based Node Attributes Provider"); } @@ -59,11 +68,68 @@ public class ConfigurationNodeAttributesProvider setDescriptors(parseAttributes(configuredNodeAttributes)); } - // TODO parse attributes from configuration @VisibleForTesting public Set parseAttributes(String config) throws IOException { - return new HashSet<>(); + if (Strings.isNullOrEmpty(config)) { + return ImmutableSet.of(); + } + Set attributeSet = new HashSet<>(); + // Configuration value should be in one line, format: + // "ATTRIBUTE_NAME,ATTRIBUTE_TYPE,ATTRIBUTE_VALUE", + // multiple node-attributes are delimited by ":". + // Each attribute str should not container any space. + String[] attributeStrs = config.split(NODE_ATTRIBUTES_DELIMITER); + for (String attributeStr : attributeStrs) { + String[] fields = attributeStr.split(NODE_ATTRIBUTE_DELIMITER); + if (fields.length != 3) { + throw new IOException("Invalid value for " + + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES + + "=" + config); + } + + // We don't allow user config to overwrite our dist prefix, + // so disallow any prefix set in the configuration. + if (fields[0].contains("/")) { + throw new IOException("Node attribute set in " + + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES + + " should not contain any prefix."); + } + + // Make sure attribute type is valid. + if (!EnumUtils.isValidEnum(NodeAttributeType.class, fields[1])) { + throw new IOException("Invalid node attribute type: " + + fields[1] + ", valid values are " + + Arrays.asList(NodeAttributeType.values())); + } + + // Automatically setup prefix for collected attributes + NodeAttribute na = NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, + fields[0], + NodeAttributeType.valueOf(fields[1]), + fields[2]); + + // Since a NodeAttribute is identical with another one as long as + // their prefix and name are same, to avoid attributes getting + // overwritten by ambiguous attribute, make sure it fails in such + // case. + if (!attributeSet.add(na)) { + throw new IOException("Ambiguous node attribute is found: " + + na.toString() + ", a same attribute already exists"); + } + } + + // Before updating the attributes to the provider, + // verify if they are valid + try { + NodeLabelUtil.validateNodeAttributes(attributeSet); + } catch (IOException e) { + throw new IOException("Node attributes set by configuration property: " + + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES + + " is not valid. Detail message: " + e.getMessage()); + } + return attributeSet; } private class ConfigurationMonitorTimerTask extends TimerTask { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ScriptBasedNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ScriptBasedNodeAttributesProvider.java index 46214348997..7e5aefc04c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ScriptBasedNodeAttributesProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ScriptBasedNodeAttributesProvider.java @@ -117,6 +117,14 @@ public class ScriptBasedNodeAttributesProvider extends NodeAttributesProvider{ + NODE_ATTRIBUTE_DELIMITER + "ATTRIBUTE_VALUE; but get " + nodeAttribute); } + + // We don't allow script to overwrite our dist prefix, + // so disallow any prefix set in the script. + if (attributeStrs[0].contains("/")) { + throw new IOException("Node attributes reported by script" + + " should not contain any prefix."); + } + // Automatically setup prefix for collected attributes NodeAttribute na = NodeAttribute .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java index 54cc8f0fd0d..d4384b47aa5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java @@ -36,6 +36,7 @@ import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import java.util.ArrayList; import java.util.concurrent.TimeoutException; @@ -182,4 +183,77 @@ public class TestConfigurationNodeAttributesProvider { Assert.fail("Expecting a failure in previous check!"); } + + @Test + public void testFetchAttributesFromConfiguration() { + Configuration conf = new Configuration(); + // Set fetch interval to -1 to disable refresh. + conf.setLong( + YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1); + conf.setStrings( + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, ""); + } + + @Test + public void testParseConfiguration() throws IOException { + // ATTRIBUTE_NAME,ATTRIBUTE_TYPE,ATTRIBUTE_VALUE + String attributesStr = "hostname,STRING,host1234:uptime,STRING,321543"; + Set attributes = nodeAttributesProvider + .parseAttributes(attributesStr); + Assert.assertEquals(2, attributes.size()); + Iterator ait = attributes.iterator(); + + while(ait.hasNext()) { + NodeAttribute at = ait.next(); + if (at.getAttributeName().equals("hostname")) { + Assert.assertEquals("hostname", at.getAttributeName()); + Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED, + at.getAttributePrefix()); + Assert.assertEquals(NodeAttributeType.STRING, + at.getAttributeType()); + Assert.assertEquals("host1234", at.getAttributeValue()); + } else if (at.getAttributeName().equals("uptime")) { + Assert.assertEquals("uptime", at.getAttributeName()); + Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED, + at.getAttributePrefix()); + Assert.assertEquals(NodeAttributeType.STRING, + at.getAttributeType()); + Assert.assertEquals("321543", at.getAttributeValue()); + } else { + Assert.fail("Unexpected attribute"); + } + } + // Missing type + attributesStr = "hostname,host1234"; + try { + nodeAttributesProvider.parseAttributes(attributesStr); + Assert.fail("Expecting a parsing failure"); + } catch (IOException e) { + Assert.assertNotNull(e); + Assert.assertTrue(e.getMessage().contains("Invalid value")); + } + + // Extra prefix + attributesStr = "prefix/hostname,STRING,host1234"; + try { + nodeAttributesProvider.parseAttributes(attributesStr); + Assert.fail("Expecting a parsing failure"); + } catch (IOException e) { + Assert.assertNotNull(e); + Assert.assertTrue(e.getMessage() + .contains("should not contain any prefix.")); + } + + // Invalid type + attributesStr = "hostname,T,host1234"; + try { + nodeAttributesProvider.parseAttributes(attributesStr); + Assert.fail("Expecting a parsing failure"); + } catch (IOException e) { + e.printStackTrace(); + Assert.assertNotNull(e); + Assert.assertTrue(e.getMessage() + .contains("Invalid node attribute type")); + } + } }