diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttribute.java index 4f6846b5c44..25ac9abd596 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttribute.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttribute.java @@ -45,13 +45,12 @@ import org.apache.hadoop.yarn.util.Records; @Unstable public abstract class NodeAttribute { - public static final String DEFAULT_PREFIX = ""; public static final String PREFIX_DISTRIBUTED = "nm.yarn.io"; public static final String PREFIX_CENTRALIZED = "rm.yarn.io"; public static NodeAttribute newInstance(String attributeName, NodeAttributeType attributeType, String attributeValue) { - return newInstance(DEFAULT_PREFIX, attributeName, attributeType, + return newInstance(PREFIX_CENTRALIZED, attributeName, attributeType, attributeValue); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeAttributesCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeAttributesCLI.java index 2eff155ee96..df5a57dff70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeAttributesCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeAttributesCLI.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.client.cli; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappin import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; /** * CLI to map attributes to Nodes. @@ -311,7 +314,7 @@ public class NodeAttributesCLI extends Configured implements Tool { */ private List buildNodeLabelsMapFromStr(String args, boolean validateForAttributes, AttributeMappingOperationType operation) { - List nodeToAttributesList = new ArrayList<>(); + Map nodeToAttributesMap = new HashMap<>(); for (String nodeToAttributesStr : args.split("[ \n]")) { // for each node to attribute mapping nodeToAttributesStr = nodeToAttributesStr.trim(); @@ -384,8 +387,9 @@ public class NodeAttributesCLI extends Configured implements Tool { // TODO when we support different type of attribute type we need to // cross verify whether input attributes itself is not violating // attribute Name to Type mapping. - attributesList.add(NodeAttribute.newInstance(attributeName.trim(), - attributeType, attributeValue.trim())); + attributesList + .add(NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, + attributeName.trim(), attributeType, attributeValue.trim())); } } if (validateForAttributes) { @@ -393,14 +397,14 @@ public class NodeAttributesCLI extends Configured implements Tool { "Attributes cannot be null or empty for Operation " + operation.name() + " on the node " + node); } - nodeToAttributesList - .add(NodeToAttributes.newInstance(node, attributesList)); + nodeToAttributesMap + .put(node,NodeToAttributes.newInstance(node, attributesList)); } - if (nodeToAttributesList.isEmpty()) { + if (nodeToAttributesMap.isEmpty()) { throw new IllegalArgumentException(NO_MAPPING_ERR_MSG); } - return nodeToAttributesList; + return Lists.newArrayList(nodeToAttributesMap.values()); } public static void main(String[] args) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestNodeAttributesCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestNodeAttributesCLI.java index cc92a93eaf8..bbd5ca34334 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestNodeAttributesCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestNodeAttributesCLI.java @@ -301,6 +301,24 @@ public class TestNodeAttributesCLI { NodesToAttributesMappingRequest.newInstance( AttributeMappingOperationType.ADD, nodeAttributesList, true); assertTrue(request.equals(expected)); + + // -------------------------------- + // with Duplicate mappings for a host + // -------------------------------- + args = new String[] { "-add", "x:key2=123,key3=abc x:key4(string)", + "-failOnUnknownNodes" }; + assertTrue("Should not fail as attribute has been properly mapped", + 0 == runTool(args)); + nodeAttributesList = new ArrayList<>(); + attributes = new ArrayList<>(); + attributes + .add(NodeAttribute.newInstance("key4", NodeAttributeType.STRING, "")); + nodeAttributesList.add(NodeToAttributes.newInstance("x", attributes)); + + expected = + NodesToAttributesMappingRequest.newInstance( + AttributeMappingOperationType.ADD, nodeAttributesList, true); + assertTrue(request.equals(expected)); } private void assertFailureMessageContains(String... messages) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index bff3f2c5ad5..12f8aaf5df8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -23,6 +23,8 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +52,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; @@ -59,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; @@ -66,6 +70,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; @@ -968,7 +973,99 @@ public class AdminService extends CompositeService implements public NodesToAttributesMappingResponse mapAttributesToNodes( NodesToAttributesMappingRequest request) throws YarnException, IOException { + + final String operation = "mapAttributesToNodes"; + final String msg = "Map Attributes to Nodes"; + UserGroupInformation user = checkAcls(operation); + checkRMStatus(user.getShortUserName(), operation, msg); + + + List nodesToAttributes = request.getNodesToAttributes(); + boolean failOnUnknownNodes = request.getFailOnUnknownNodes(); + + Map> nodeAttributeMapping = + validateAndFetch(nodesToAttributes, failOnUnknownNodes); + + NodeAttributesManager nodeAttributesManager = + rm.getRMContext().getNodeAttributesManager(); + try { + switch (request.getOperation()) { + case ADD: + nodeAttributesManager.addNodeAttributes(nodeAttributeMapping); + break; + case REMOVE: + nodeAttributesManager.removeNodeAttributes(nodeAttributeMapping); + break; + case REPLACE: + nodeAttributesManager.replaceNodeAttributes( + NodeAttribute.PREFIX_CENTRALIZED, nodeAttributeMapping); + break; + default: + throw new IOException("Invalid operation " + request.getOperation() + + " specified in the mapAttributesToNodes request "); + + } + } catch (IOException ioe) { + throw logAndWrapException(ioe, user.getShortUserName(), operation, msg); + } + RMAuditLogger.logSuccess(user.getShortUserName(), operation, + "AdminService"); return recordFactory .newRecordInstance(NodesToAttributesMappingResponse.class); } + + /** + * @param nodesToAttributesMapping input to be validated + * @param failOnUnknownNodes indicates to fail if the nodes are not available. + * @return the map of Node host name to set of NodeAttributes + * @throws IOException if validation fails for node existence or the attribute + * has a wrong prefix + */ + private Map> validateAndFetch( + List nodesToAttributesMapping, + boolean failOnUnknownNodes) throws IOException { + Map> attributeMapping = new HashMap<>(); + List invalidNodes = new ArrayList<>(); + for (NodeToAttributes nodeToAttributes : nodesToAttributesMapping) { + String node = nodeToAttributes.getNode(); + if (!validateForInvalidNode(node, failOnUnknownNodes)) { + invalidNodes.add(node); + continue; + } + List nodeAttributes = nodeToAttributes.getNodeAttributes(); + if (!nodeAttributes.stream() + .allMatch(nodeAttribute -> NodeAttribute.PREFIX_CENTRALIZED + .equals(nodeAttribute.getAttributePrefix()))) { + throw new IOException("Invalid Attribute Mapping for the node " + node + + ". Prefix should be " + NodeAttribute.PREFIX_CENTRALIZED); + } + attributeMapping.put(node, new HashSet<>(nodeAttributes)); + } + if (!invalidNodes.isEmpty()) { + String message = " Following nodes does not exist : " + invalidNodes; + LOG.error(message); + throw new IOException(message); + } + return attributeMapping; + } + + /** + * @param node + * @return true if valid else false; + */ + private boolean validateForInvalidNode(String node, + boolean failOnUnknownNodes) { + if (!failOnUnknownNodes) { + return true; + } + // both active and inactive nodes are recognized as known nodes + boolean isKnown = rm.getRMContext().getRMNodes().keySet().stream() + .anyMatch(activeNode -> activeNode.getHost().equals(node)); + + if (!isKnown) { + isKnown = rm.getRMContext().getInactiveRMNodes().keySet().stream() + .anyMatch(inactiveNode -> inactiveNode.getHost().equals(node)); + } + return isKnown; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index f0484e122a9..90945c2a5e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.records.DecommissionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; @@ -60,6 +62,9 @@ import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -85,11 +90,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; import static org.junit.Assert.assertTrue; @@ -1203,21 +1211,7 @@ public class TestRMAdminService { ((RMContextImpl) rm.getRMContext()) .setHAServiceState(HAServiceState.ACTIVE); - Map rmNodes = rm.getRMContext().getRMNodes(); - rmNodes.put(NodeId.newInstance("host1", 1111), - new RMNodeImpl(null, rm.getRMContext(), "host1", 0, 0, null, null, - null)); - rmNodes.put(NodeId.newInstance("host2", 2222), - new RMNodeImpl(null, rm.getRMContext(), "host2", 0, 0, null, null, - null)); - rmNodes.put(NodeId.newInstance("host3", 3333), - new RMNodeImpl(null, rm.getRMContext(), "host3", 0, 0, null, null, - null)); - Map rmInactiveNodes = rm.getRMContext() - .getInactiveRMNodes(); - rmInactiveNodes.put(NodeId.newInstance("host4", 4444), - new RMNodeImpl(null, rm.getRMContext(), "host4", 0, 0, null, null, - null)); + setActiveAndInactiveNodes(rm); RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); // by default, distributed configuration for node label is disabled, this @@ -1552,4 +1546,150 @@ public class TestRMAdminService { Assert.assertTrue( response.getNodeLabelList().containsAll(Arrays.asList(labelX, labelY))); } + + @Test(timeout = 30000) + public void testMapAttributesToNodes() throws Exception, YarnException { + // 1. Need to test for the Invalid Node + // 1.1. Need to test for active nodes + // 1.2. Need to test for Inactive nodes + // 1.3. Test with Single Node invalid + // 1.4. Need to test with port (should fail) + // 1.5. Test with unknown node when failOnUnknownNodes is false + + // also test : 3. Ensure Appropriate manager Method call is done + rm = new MockRM(); + + NodeAttributesManager spiedAttributesManager = + Mockito.spy(rm.getRMContext().getNodeAttributesManager()); + rm.getRMContext().setNodeAttributesManager(spiedAttributesManager); + + ((RMContextImpl) rm.getRMContext()) + .setHAServiceState(HAServiceState.ACTIVE); + setActiveAndInactiveNodes(rm); + // by default, distributed configuration for node label is disabled, this + // should pass + NodesToAttributesMappingRequest request = + NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.ADD, + ImmutableList.of(NodeToAttributes.newInstance("host1", + ImmutableList.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_CENTRALIZED, "x", + NodeAttributeType.STRING, "dfasdf")))), + true); + try { + rm.adminService.mapAttributesToNodes(request); + } catch (Exception ex) { + fail("should not fail on known node in active state" + ex.getMessage()); + } + Mockito.verify(spiedAttributesManager, Mockito.times(1)) + .addNodeAttributes(Mockito.anyMap()); + + request = + NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.REMOVE, + ImmutableList.of(NodeToAttributes.newInstance("host4", + ImmutableList.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_CENTRALIZED, "x", + NodeAttributeType.STRING, "dfasdf")))), + true); + try { + rm.adminService.mapAttributesToNodes(request); + } catch (Exception ex) { + fail("should not fail on known node in inactive state" + ex.getMessage()); + } + Mockito.verify(spiedAttributesManager, Mockito.times(1)) + .removeNodeAttributes(Mockito.anyMap()); + + request = + NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.ADD, + ImmutableList.of(NodeToAttributes.newInstance("host5", + ImmutableList.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_CENTRALIZED, "x", + NodeAttributeType.STRING, "dfasdf")))), + true); + try { + rm.adminService.mapAttributesToNodes(request); + fail("host5 is not a valid node, It should have failed"); + } catch (Exception ex) { + Assert.assertEquals("Exception Message is not as desired", + " Following nodes does not exist : [host5]", ex.getMessage()); + } + + request = + NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.ADD, ImmutableList.of( + NodeToAttributes.newInstance("host4:8889", + ImmutableList.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_CENTRALIZED, "x", + NodeAttributeType.STRING, "dfasdf"))), + NodeToAttributes.newInstance("host2:8889", + ImmutableList.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_CENTRALIZED, "x", + NodeAttributeType.STRING, "dfasdf")))), + true); + try { + // port if added in CLI it fails in the client itself. Here we just check + // against hostname hence the message as : nodes does not exist. + rm.adminService.mapAttributesToNodes(request); + fail("host with the port should fail as only hostnames are validated"); + } catch (Exception ex) { + Assert.assertEquals("Exception Message is not as desired", + " Following nodes does not exist : [host4:8889, host2:8889]", + ex.getMessage()); + } + + request = + NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.REPLACE, + ImmutableList.of(NodeToAttributes.newInstance("host5", + ImmutableList.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_CENTRALIZED, "x", + NodeAttributeType.STRING, "dfasdf")))), + false); + try { + rm.adminService.mapAttributesToNodes(request); + } catch (Exception ex) { + fail("This operation should not fail as failOnUnknownNodes is false : " + + ex.getMessage()); + } + Mockito.verify(spiedAttributesManager, Mockito.times(1)) + .replaceNodeAttributes(Mockito.eq(NodeAttribute.PREFIX_CENTRALIZED), + Mockito.anyMap()); + + // 2. fail on invalid prefix + request = + NodesToAttributesMappingRequest + .newInstance(AttributeMappingOperationType.ADD, + ImmutableList.of(NodeToAttributes.newInstance("host5", + ImmutableList.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "x", + NodeAttributeType.STRING, "dfasdf")))), + false); + try { + rm.adminService.mapAttributesToNodes(request); + fail("This operation should fail as prefix should be \"nm.yarn.io\"."); + } catch (Exception ex) { + Assert.assertEquals("Exception Message is not as desired", + "Invalid Attribute Mapping for the node host5. Prefix should be " + + "rm.yarn.io", + ex.getMessage()); + } + + rm.close(); + } + + private void setActiveAndInactiveNodes(ResourceManager resourceManager) { + Map rmNodes = resourceManager.getRMContext().getRMNodes(); + rmNodes.put(NodeId.newInstance("host1", 1111), new RMNodeImpl(null, + resourceManager.getRMContext(), "host1", 0, 0, null, null, null)); + rmNodes.put(NodeId.newInstance("host2", 2222), new RMNodeImpl(null, + resourceManager.getRMContext(), "host2", 0, 0, null, null, null)); + rmNodes.put(NodeId.newInstance("host3", 3333), new RMNodeImpl(null, + resourceManager.getRMContext(), "host3", 0, 0, null, null, null)); + Map rmInactiveNodes = + resourceManager.getRMContext().getInactiveRMNodes(); + rmInactiveNodes.put(NodeId.newInstance("host4", 4444), new RMNodeImpl(null, + resourceManager.getRMContext(), "host4", 0, 0, null, null, null)); + } }