From 04f594dbea98d45e886c8ef19ac42f22971b757f Mon Sep 17 00:00:00 2001 From: Naganarasimha Date: Mon, 3 Oct 2016 02:02:26 -0400 Subject: [PATCH] YARN-4855. Should check if node exists when replace nodelabels. Contributeed by Tao Jie (cherry picked from commit 6e130c308cf1b97e8386b6a43c26d72d2850119c) --- .../ReplaceLabelsOnNodeRequest.java | 8 ++ ...erver_resourcemanager_service_protos.proto | 2 +- .../hadoop/yarn/client/cli/RMAdminCLI.java | 39 ++++--- .../yarn/client/cli/TestRMAdminCLI.java | 3 +- .../pb/ReplaceLabelsOnNodeRequestPBImpl.java | 14 ++- .../server/resourcemanager/AdminService.java | 46 ++++++++ .../resourcemanager/TestRMAdminService.java | 103 +++++++++++++++++- 7 files changed, 197 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java index 28e261a5895..1b8e687b3dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java @@ -44,4 +44,12 @@ public static ReplaceLabelsOnNodeRequest newInstance( @Public @Evolving public abstract Map> getNodeToLabels(); + + @Public + @Evolving + public abstract void setFailOnUnknownNodes(boolean failOnUnknownNodes); + + @Public + @Evolving + public abstract boolean getFailOnUnknownNodes(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index b9f30db46ee..16d80974c83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -99,10 +99,10 @@ message RemoveFromClusterNodeLabelsResponseProto { message ReplaceLabelsOnNodeRequestProto { repeated NodeIdToLabelsNameProto nodeToLabels = 1; + optional bool failOnUnknownNodes = 2; } message ReplaceLabelsOnNodeResponseProto { - } message UpdateNodeLabelsResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 7a898a12a38..640f8e36cfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -130,11 +130,13 @@ public class RMAdminCLI extends HAAdmin { new UsageInfo(" (label splitted by \",\")", "remove from cluster node labels")) .put("-replaceLabelsOnNode", - new UsageInfo( + new UsageInfo("[-failOnUnknownNodes] " + "<\"node1[:port]=label1,label2 node2[:port]=label1,label2\">", - "replace labels on nodes" - + " (please note that we do not support specifying multiple" - + " labels on a single host for now.)")) + "replace labels on nodes" + + " (please note that we do not support specifying multiple" + + " labels on a single host for now.)\n\t\t" + + "[-failOnUnknownNodes] is optional, when we set this" + + " option, it will fail if specified nodes are unknown.")) .put("-directlyAccessNodeLabelStore", new UsageInfo("", "This is DEPRECATED, will be removed in future releases. Directly access node label store, " + "with this option, all node label related operations" @@ -246,8 +248,8 @@ private static void printHelp(String cmd, boolean isHAEnabled) { " [-addToClusterNodeLabels <\"label1(exclusive=true)," + "label2(exclusive=false),label3\">]" + " [-removeFromClusterNodeLabels ]" + - " [-replaceLabelsOnNode <\"node1[:port]=label1,label2" + - " node2[:port]=label1\">]" + + " [-replaceLabelsOnNode [-failOnUnknownNodes] " + + "<\"node1[:port]=label1,label2 node2[:port]=label1\">]" + " [-directlyAccessNodeLabelStore]" + " [-refreshClusterMaxPriority]" + " [-updateNodeResource [NodeID] [MemSize] [vCores]" + @@ -302,7 +304,7 @@ protected ResourceManagerAdministrationProtocol createAdminProtocol() return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); } - + private int refreshQueues() throws IOException, YarnException { // Refresh the queue properties ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); @@ -657,14 +659,14 @@ private Map> buildNodeLabelsMapFromStr(String args) { return map; } - private int replaceLabelsOnNodes(String args) throws IOException, - YarnException { + private int replaceLabelsOnNodes(String args, boolean failOnUnknownNodes) + throws IOException, YarnException { Map> map = buildNodeLabelsMapFromStr(args); - return replaceLabelsOnNodes(map); + return replaceLabelsOnNodes(map, failOnUnknownNodes); } - private int replaceLabelsOnNodes(Map> map) - throws IOException, YarnException { + private int replaceLabelsOnNodes(Map> map, + boolean failOnUnknownNodes) throws IOException, YarnException { if (directlyAccessNodeLabelStore) { getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map); } else { @@ -672,11 +674,12 @@ private int replaceLabelsOnNodes(Map> map) createAdminProtocol(); ReplaceLabelsOnNodeRequest request = ReplaceLabelsOnNodeRequest.newInstance(map); + request.setFailOnUnknownNodes(failOnUnknownNodes); adminProtocol.replaceLabelsOnNode(request); } return 0; } - + @Override public int run(String[] args) throws Exception { // -directlyAccessNodeLabelStore is a additional option for node label @@ -783,8 +786,16 @@ public int run(String[] args) throws Exception { System.err.println(NO_MAPPING_ERR_MSG); printUsage("", isHAEnabled); exitCode = -1; + } else if ("-failOnUnknownNodes".equals(args[i])) { + if (i + 1 >= args.length) { + System.err.println(NO_MAPPING_ERR_MSG); + printUsage("", isHAEnabled); + exitCode = -1; + } else { + exitCode = replaceLabelsOnNodes(args[i + 1], true); + } } else { - exitCode = replaceLabelsOnNodes(args[i]); + exitCode = replaceLabelsOnNodes(args[i], false); } } else { exitCode = -1; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index bea6e39e1dd..9e20a4359a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -469,7 +469,7 @@ public void testHelp() throws Exception { "[username]] [-addToClusterNodeLabels " + "<\"label1(exclusive=true),label2(exclusive=false),label3\">] " + "[-removeFromClusterNodeLabels ] " + - "[-replaceLabelsOnNode " + + "[-replaceLabelsOnNode [-failOnUnknownNodes] " + "<\"node1[:port]=label1,label2 node2[:port]=label1\">] " + "[-directlyAccessNodeLabelStore] [-refreshClusterMaxPriority] " + "[-updateNodeResource [NodeID] [MemSize] [vCores] " + @@ -564,6 +564,7 @@ public void testHelp() throws Exception { + " [username]] [-addToClusterNodeLabels <\"label1(exclusive=true)," + "label2(exclusive=false),label3\">]" + " [-removeFromClusterNodeLabels ] [-replaceLabelsOnNode " + + "[-failOnUnknownNodes] " + "<\"node1[:port]=label1,label2 node2[:port]=label1\">] [-directlyAccessNodeLabelStore] " + "[-refreshClusterMaxPriority] " + "[-updateNodeResource [NodeID] [MemSize] [vCores] " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java index 22e561cd94a..3b15b27d78b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java @@ -146,10 +146,22 @@ public void setNodeToLabels(Map> map) { nodeIdToLabels.putAll(map); } + @Override + public boolean getFailOnUnknownNodes() { + ReplaceLabelsOnNodeRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getFailOnUnknownNodes(); + } + + @Override + public void setFailOnUnknownNodes(boolean failOnUnknownNodes) { + maybeInitBuilder(); + builder.setFailOnUnknownNodes(failOnUnknownNodes); + } + private NodeIdProto convertToProtoFormat(NodeId t) { return ((NodeIdPBImpl) t).getProto(); } - + @Override public int hashCode() { assert false : "hashCode not designed"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index db55264c483..33daf7f09bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; @@ -806,6 +809,49 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( ReplaceLabelsOnNodeResponse response = recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class); + + if (request.getFailOnUnknownNodes()) { + // verify if nodes have registered to RM + List unknownNodes = new ArrayList<>(); + for (NodeId requestedNode : request.getNodeToLabels().keySet()) { + boolean isKnown = false; + // both active and inactive nodes are recognized as known nodes + if (requestedNode.getPort() != 0) { + if (rmContext.getRMNodes().containsKey(requestedNode) + || rmContext.getInactiveRMNodes().containsKey(requestedNode)) { + isKnown = true; + } + } else { + for (NodeId knownNode : rmContext.getRMNodes().keySet()) { + if (knownNode.getHost().equals(requestedNode.getHost())) { + isKnown = true; + break; + } + } + if (!isKnown) { + for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) { + if (knownNode.getHost().equals(requestedNode.getHost())) { + isKnown = true; + break; + } + } + } + } + if (!isKnown) { + unknownNodes.add(requestedNode); + } + } + + if (!unknownNodes.isEmpty()) { + RMAuditLogger.logFailure(user.getShortUserName(), operation, "", + "AdminService", + "Failed to replace labels as there are unknown nodes:" + + Arrays.toString(unknownNodes.toArray())); + throw RPCUtil.getRemoteException(new IOException( + "Failed to replace labels as there are unknown nodes:" + + Arrays.toString(unknownNodes.toArray()))); + } + } try { rmContext.getNodeLabelManager().replaceLabelsOnNode( request.getNodeToLabels()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 0b65c0b50c9..a3022f7a487 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -28,6 +28,7 @@ import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -64,9 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -1085,6 +1086,106 @@ public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled() rm.close(); } + @Test + public void testModifyLabelsOnUnknownNodes() throws IOException, + YarnException { + // create RM and set it's ACTIVE, and set distributed node label + // configuration to true + rm = new MockRM(); + + ((RMContextImpl) rm.getRMContext()) + .setHAServiceState(HAServiceState.ACTIVE); + Map rmNodes = rm.getRMContext().getRMNodes(); + rmNodes.put(NodeId.newInstance("host1", 1111), + new RMNodeImpl(null, rm.getRMContext(), "host1", 0, 0, null, null, + null)); + rmNodes.put(NodeId.newInstance("host2", 2222), + new RMNodeImpl(null, rm.getRMContext(), "host2", 0, 0, null, null, + null)); + rmNodes.put(NodeId.newInstance("host3", 3333), + new RMNodeImpl(null, rm.getRMContext(), "host3", 0, 0, null, null, + null)); + Map rmInactiveNodes = rm.getRMContext() + .getInactiveRMNodes(); + rmInactiveNodes.put(NodeId.newInstance("host4", 4444), + new RMNodeImpl(null, rm.getRMContext(), "host4", 0, 0, null, null, + null)); + RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); + + // by default, distributed configuration for node label is disabled, this + // should pass + labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", + "y")); + // replace known node + ReplaceLabelsOnNodeRequest request1 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host1", 1111), + (Set) ImmutableSet.of("x"))); + request1.setFailOnUnknownNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request1); + } catch (Exception ex) { + fail("should not fail on known node"); + } + + // replace known node with wildcard port + ReplaceLabelsOnNodeRequest request2 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host1", 0), + (Set) ImmutableSet.of("x"))); + request2.setFailOnUnknownNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request2); + } catch (Exception ex) { + fail("should not fail on known node"); + } + + // replace unknown node + ReplaceLabelsOnNodeRequest request3 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host5", 0), + (Set) ImmutableSet.of("x"))); + request3.setFailOnUnknownNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request3); + fail("Should fail on unknown node"); + } catch (Exception ex) { + } + + // replace known node but wrong port + ReplaceLabelsOnNodeRequest request4 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host2", 1111), + (Set) ImmutableSet.of("x"))); + request4.setFailOnUnknownNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request4); + fail("Should fail on node with wrong port"); + } catch (Exception ex) { + } + + // replace non-exist node but not check + ReplaceLabelsOnNodeRequest request5 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host5", 0), + (Set) ImmutableSet.of("x"))); + request5.setFailOnUnknownNodes(false); + try { + rm.adminService.replaceLabelsOnNode(request5); + } catch (Exception ex) { + fail("Should not fail on unknown node when " + + "fail-on-unkown-nodes is set false"); + } + + // replace on inactive node + ReplaceLabelsOnNodeRequest request6 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host4", 0), + (Set) ImmutableSet.of("x"))); + request6.setFailOnUnknownNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request6); + } catch (Exception ex) { + fail("should not fail on inactive node"); + } + + rm.close(); + } + @Test public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled() throws IOException, YarnException {