YARN-8033. CLI Integration with NodeAttributesManagerImpl. Contributed by Naganarasimha G R.

This commit is contained in:
bibinchundatt 2018-04-01 19:24:00 +05:30 committed by Sunil G
parent 89b3ebd11e
commit 901e85238d
5 changed files with 282 additions and 24 deletions

View File

@ -45,13 +45,12 @@ import org.apache.hadoop.yarn.util.Records;
@Unstable
public abstract class NodeAttribute {
public static final String DEFAULT_PREFIX = "";
public static final String PREFIX_DISTRIBUTED = "nm.yarn.io";
public static final String PREFIX_CENTRALIZED = "rm.yarn.io";
public static NodeAttribute newInstance(String attributeName,
NodeAttributeType attributeType, String attributeValue) {
return newInstance(DEFAULT_PREFIX, attributeName, attributeType,
return newInstance(PREFIX_CENTRALIZED, attributeName, attributeType,
attributeValue);
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.client.cli;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappin
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* CLI to map attributes to Nodes.
@ -311,7 +314,7 @@ public class NodeAttributesCLI extends Configured implements Tool {
*/
private List<NodeToAttributes> buildNodeLabelsMapFromStr(String args,
boolean validateForAttributes, AttributeMappingOperationType operation) {
List<NodeToAttributes> nodeToAttributesList = new ArrayList<>();
Map<String,NodeToAttributes> nodeToAttributesMap = new HashMap<>();
for (String nodeToAttributesStr : args.split("[ \n]")) {
// for each node to attribute mapping
nodeToAttributesStr = nodeToAttributesStr.trim();
@ -384,8 +387,9 @@ public class NodeAttributesCLI extends Configured implements Tool {
// TODO when we support different type of attribute type we need to
// cross verify whether input attributes itself is not violating
// attribute Name to Type mapping.
attributesList.add(NodeAttribute.newInstance(attributeName.trim(),
attributeType, attributeValue.trim()));
attributesList
.add(NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED,
attributeName.trim(), attributeType, attributeValue.trim()));
}
}
if (validateForAttributes) {
@ -393,14 +397,14 @@ public class NodeAttributesCLI extends Configured implements Tool {
"Attributes cannot be null or empty for Operation "
+ operation.name() + " on the node " + node);
}
nodeToAttributesList
.add(NodeToAttributes.newInstance(node, attributesList));
nodeToAttributesMap
.put(node,NodeToAttributes.newInstance(node, attributesList));
}
if (nodeToAttributesList.isEmpty()) {
if (nodeToAttributesMap.isEmpty()) {
throw new IllegalArgumentException(NO_MAPPING_ERR_MSG);
}
return nodeToAttributesList;
return Lists.newArrayList(nodeToAttributesMap.values());
}
public static void main(String[] args) throws Exception {

View File

@ -301,6 +301,24 @@ public class TestNodeAttributesCLI {
NodesToAttributesMappingRequest.newInstance(
AttributeMappingOperationType.ADD, nodeAttributesList, true);
assertTrue(request.equals(expected));
// --------------------------------
// with Duplicate mappings for a host
// --------------------------------
args = new String[] { "-add", "x:key2=123,key3=abc x:key4(string)",
"-failOnUnknownNodes" };
assertTrue("Should not fail as attribute has been properly mapped",
0 == runTool(args));
nodeAttributesList = new ArrayList<>();
attributes = new ArrayList<>();
attributes
.add(NodeAttribute.newInstance("key4", NodeAttributeType.STRING, ""));
nodeAttributesList.add(NodeToAttributes.newInstance("x", attributes));
expected =
NodesToAttributesMappingRequest.newInstance(
AttributeMappingOperationType.ADD, nodeAttributesList, true);
assertTrue(request.equals(expected));
}
private void assertFailureMessageContains(String... messages) {

View File

@ -23,6 +23,8 @@ import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -50,6 +52,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.HAUtil;
@ -59,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
@ -66,6 +70,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@ -968,7 +973,99 @@ public class AdminService extends CompositeService implements
public NodesToAttributesMappingResponse mapAttributesToNodes(
NodesToAttributesMappingRequest request)
throws YarnException, IOException {
final String operation = "mapAttributesToNodes";
final String msg = "Map Attributes to Nodes";
UserGroupInformation user = checkAcls(operation);
checkRMStatus(user.getShortUserName(), operation, msg);
List<NodeToAttributes> nodesToAttributes = request.getNodesToAttributes();
boolean failOnUnknownNodes = request.getFailOnUnknownNodes();
Map<String, Set<NodeAttribute>> nodeAttributeMapping =
validateAndFetch(nodesToAttributes, failOnUnknownNodes);
NodeAttributesManager nodeAttributesManager =
rm.getRMContext().getNodeAttributesManager();
try {
switch (request.getOperation()) {
case ADD:
nodeAttributesManager.addNodeAttributes(nodeAttributeMapping);
break;
case REMOVE:
nodeAttributesManager.removeNodeAttributes(nodeAttributeMapping);
break;
case REPLACE:
nodeAttributesManager.replaceNodeAttributes(
NodeAttribute.PREFIX_CENTRALIZED, nodeAttributeMapping);
break;
default:
throw new IOException("Invalid operation " + request.getOperation()
+ " specified in the mapAttributesToNodes request ");
}
} catch (IOException ioe) {
throw logAndWrapException(ioe, user.getShortUserName(), operation, msg);
}
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return recordFactory
.newRecordInstance(NodesToAttributesMappingResponse.class);
}
/**
* @param nodesToAttributesMapping input to be validated
* @param failOnUnknownNodes indicates to fail if the nodes are not available.
* @return the map of Node host name to set of NodeAttributes
* @throws IOException if validation fails for node existence or the attribute
* has a wrong prefix
*/
private Map<String, Set<NodeAttribute>> validateAndFetch(
List<NodeToAttributes> nodesToAttributesMapping,
boolean failOnUnknownNodes) throws IOException {
Map<String, Set<NodeAttribute>> attributeMapping = new HashMap<>();
List<String> invalidNodes = new ArrayList<>();
for (NodeToAttributes nodeToAttributes : nodesToAttributesMapping) {
String node = nodeToAttributes.getNode();
if (!validateForInvalidNode(node, failOnUnknownNodes)) {
invalidNodes.add(node);
continue;
}
List<NodeAttribute> nodeAttributes = nodeToAttributes.getNodeAttributes();
if (!nodeAttributes.stream()
.allMatch(nodeAttribute -> NodeAttribute.PREFIX_CENTRALIZED
.equals(nodeAttribute.getAttributePrefix()))) {
throw new IOException("Invalid Attribute Mapping for the node " + node
+ ". Prefix should be " + NodeAttribute.PREFIX_CENTRALIZED);
}
attributeMapping.put(node, new HashSet<>(nodeAttributes));
}
if (!invalidNodes.isEmpty()) {
String message = " Following nodes does not exist : " + invalidNodes;
LOG.error(message);
throw new IOException(message);
}
return attributeMapping;
}
/**
* @param node
* @return true if valid else false;
*/
private boolean validateForInvalidNode(String node,
boolean failOnUnknownNodes) {
if (!failOnUnknownNodes) {
return true;
}
// both active and inactive nodes are recognized as known nodes
boolean isKnown = rm.getRMContext().getRMNodes().keySet().stream()
.anyMatch(activeNode -> activeNode.getHost().equals(node));
if (!isKnown) {
isKnown = rm.getRMContext().getInactiveRMNodes().keySet().stream()
.anyMatch(inactiveNode -> inactiveNode.getHost().equals(node));
}
return isKnown;
}
}

View File

@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
@ -60,6 +62,9 @@ import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -85,11 +90,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import static org.junit.Assert.assertTrue;
@ -1203,21 +1211,7 @@ public class TestRMAdminService {
((RMContextImpl) rm.getRMContext())
.setHAServiceState(HAServiceState.ACTIVE);
Map<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
rmNodes.put(NodeId.newInstance("host1", 1111),
new RMNodeImpl(null, rm.getRMContext(), "host1", 0, 0, null, null,
null));
rmNodes.put(NodeId.newInstance("host2", 2222),
new RMNodeImpl(null, rm.getRMContext(), "host2", 0, 0, null, null,
null));
rmNodes.put(NodeId.newInstance("host3", 3333),
new RMNodeImpl(null, rm.getRMContext(), "host3", 0, 0, null, null,
null));
Map<NodeId, RMNode> rmInactiveNodes = rm.getRMContext()
.getInactiveRMNodes();
rmInactiveNodes.put(NodeId.newInstance("host4", 4444),
new RMNodeImpl(null, rm.getRMContext(), "host4", 0, 0, null, null,
null));
setActiveAndInactiveNodes(rm);
RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
// by default, distributed configuration for node label is disabled, this
@ -1552,4 +1546,150 @@ public class TestRMAdminService {
Assert.assertTrue(
response.getNodeLabelList().containsAll(Arrays.asList(labelX, labelY)));
}
@Test(timeout = 30000)
public void testMapAttributesToNodes() throws Exception, YarnException {
// 1. Need to test for the Invalid Node
// 1.1. Need to test for active nodes
// 1.2. Need to test for Inactive nodes
// 1.3. Test with Single Node invalid
// 1.4. Need to test with port (should fail)
// 1.5. Test with unknown node when failOnUnknownNodes is false
// also test : 3. Ensure Appropriate manager Method call is done
rm = new MockRM();
NodeAttributesManager spiedAttributesManager =
Mockito.spy(rm.getRMContext().getNodeAttributesManager());
rm.getRMContext().setNodeAttributesManager(spiedAttributesManager);
((RMContextImpl) rm.getRMContext())
.setHAServiceState(HAServiceState.ACTIVE);
setActiveAndInactiveNodes(rm);
// by default, distributed configuration for node label is disabled, this
// should pass
NodesToAttributesMappingRequest request =
NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.ADD,
ImmutableList.of(NodeToAttributes.newInstance("host1",
ImmutableList.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf")))),
true);
try {
rm.adminService.mapAttributesToNodes(request);
} catch (Exception ex) {
fail("should not fail on known node in active state" + ex.getMessage());
}
Mockito.verify(spiedAttributesManager, Mockito.times(1))
.addNodeAttributes(Mockito.anyMap());
request =
NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.REMOVE,
ImmutableList.of(NodeToAttributes.newInstance("host4",
ImmutableList.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf")))),
true);
try {
rm.adminService.mapAttributesToNodes(request);
} catch (Exception ex) {
fail("should not fail on known node in inactive state" + ex.getMessage());
}
Mockito.verify(spiedAttributesManager, Mockito.times(1))
.removeNodeAttributes(Mockito.anyMap());
request =
NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.ADD,
ImmutableList.of(NodeToAttributes.newInstance("host5",
ImmutableList.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf")))),
true);
try {
rm.adminService.mapAttributesToNodes(request);
fail("host5 is not a valid node, It should have failed");
} catch (Exception ex) {
Assert.assertEquals("Exception Message is not as desired",
" Following nodes does not exist : [host5]", ex.getMessage());
}
request =
NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.ADD, ImmutableList.of(
NodeToAttributes.newInstance("host4:8889",
ImmutableList.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf"))),
NodeToAttributes.newInstance("host2:8889",
ImmutableList.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf")))),
true);
try {
// port if added in CLI it fails in the client itself. Here we just check
// against hostname hence the message as : nodes does not exist.
rm.adminService.mapAttributesToNodes(request);
fail("host with the port should fail as only hostnames are validated");
} catch (Exception ex) {
Assert.assertEquals("Exception Message is not as desired",
" Following nodes does not exist : [host4:8889, host2:8889]",
ex.getMessage());
}
request =
NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.REPLACE,
ImmutableList.of(NodeToAttributes.newInstance("host5",
ImmutableList.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_CENTRALIZED, "x",
NodeAttributeType.STRING, "dfasdf")))),
false);
try {
rm.adminService.mapAttributesToNodes(request);
} catch (Exception ex) {
fail("This operation should not fail as failOnUnknownNodes is false : "
+ ex.getMessage());
}
Mockito.verify(spiedAttributesManager, Mockito.times(1))
.replaceNodeAttributes(Mockito.eq(NodeAttribute.PREFIX_CENTRALIZED),
Mockito.anyMap());
// 2. fail on invalid prefix
request =
NodesToAttributesMappingRequest
.newInstance(AttributeMappingOperationType.ADD,
ImmutableList.of(NodeToAttributes.newInstance("host5",
ImmutableList.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "x",
NodeAttributeType.STRING, "dfasdf")))),
false);
try {
rm.adminService.mapAttributesToNodes(request);
fail("This operation should fail as prefix should be \"nm.yarn.io\".");
} catch (Exception ex) {
Assert.assertEquals("Exception Message is not as desired",
"Invalid Attribute Mapping for the node host5. Prefix should be "
+ "rm.yarn.io",
ex.getMessage());
}
rm.close();
}
private void setActiveAndInactiveNodes(ResourceManager resourceManager) {
Map<NodeId, RMNode> rmNodes = resourceManager.getRMContext().getRMNodes();
rmNodes.put(NodeId.newInstance("host1", 1111), new RMNodeImpl(null,
resourceManager.getRMContext(), "host1", 0, 0, null, null, null));
rmNodes.put(NodeId.newInstance("host2", 2222), new RMNodeImpl(null,
resourceManager.getRMContext(), "host2", 0, 0, null, null, null));
rmNodes.put(NodeId.newInstance("host3", 3333), new RMNodeImpl(null,
resourceManager.getRMContext(), "host3", 0, 0, null, null, null));
Map<NodeId, RMNode> rmInactiveNodes =
resourceManager.getRMContext().getInactiveRMNodes();
rmInactiveNodes.put(NodeId.newInstance("host4", 4444), new RMNodeImpl(null,
resourceManager.getRMContext(), "host4", 0, 0, null, null, null));
}
}