YARN-4855. Should check if node exists when replace nodelabels. Contributeed by Tao Jie
This commit is contained in:
parent
82857037b6
commit
6e130c308c
|
@ -44,4 +44,12 @@ public abstract class ReplaceLabelsOnNodeRequest {
|
|||
@Public
|
||||
@Evolving
|
||||
public abstract Map<NodeId, Set<String>> getNodeToLabels();
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract void setFailOnUnknownNodes(boolean failOnUnknownNodes);
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract boolean getFailOnUnknownNodes();
|
||||
}
|
||||
|
|
|
@ -99,10 +99,10 @@ message RemoveFromClusterNodeLabelsResponseProto {
|
|||
|
||||
message ReplaceLabelsOnNodeRequestProto {
|
||||
repeated NodeIdToLabelsNameProto nodeToLabels = 1;
|
||||
optional bool failOnUnknownNodes = 2;
|
||||
}
|
||||
|
||||
message ReplaceLabelsOnNodeResponseProto {
|
||||
|
||||
}
|
||||
|
||||
message UpdateNodeLabelsResponseProto {
|
||||
|
|
|
@ -130,11 +130,13 @@ public class RMAdminCLI extends HAAdmin {
|
|||
new UsageInfo("<label1,label2,label3> (label splitted by \",\")",
|
||||
"remove from cluster node labels"))
|
||||
.put("-replaceLabelsOnNode",
|
||||
new UsageInfo(
|
||||
new UsageInfo("[-failOnUnknownNodes] " +
|
||||
"<\"node1[:port]=label1,label2 node2[:port]=label1,label2\">",
|
||||
"replace labels on nodes"
|
||||
+ " (please note that we do not support specifying multiple"
|
||||
+ " labels on a single host for now.)"))
|
||||
"replace labels on nodes"
|
||||
+ " (please note that we do not support specifying multiple"
|
||||
+ " labels on a single host for now.)\n\t\t"
|
||||
+ "[-failOnUnknownNodes] is optional, when we set this"
|
||||
+ " option, it will fail if specified nodes are unknown."))
|
||||
.put("-directlyAccessNodeLabelStore",
|
||||
new UsageInfo("", "This is DEPRECATED, will be removed in future releases. Directly access node label store, "
|
||||
+ "with this option, all node label related operations"
|
||||
|
@ -246,8 +248,8 @@ public class RMAdminCLI extends HAAdmin {
|
|||
" [-addToClusterNodeLabels <\"label1(exclusive=true),"
|
||||
+ "label2(exclusive=false),label3\">]" +
|
||||
" [-removeFromClusterNodeLabels <label1,label2,label3>]" +
|
||||
" [-replaceLabelsOnNode <\"node1[:port]=label1,label2" +
|
||||
" node2[:port]=label1\">]" +
|
||||
" [-replaceLabelsOnNode [-failOnUnknownNodes] "
|
||||
+ "<\"node1[:port]=label1,label2 node2[:port]=label1\">]" +
|
||||
" [-directlyAccessNodeLabelStore]" +
|
||||
" [-refreshClusterMaxPriority]" +
|
||||
" [-updateNodeResource [NodeID] [MemSize] [vCores]" +
|
||||
|
@ -657,14 +659,14 @@ public class RMAdminCLI extends HAAdmin {
|
|||
return map;
|
||||
}
|
||||
|
||||
private int replaceLabelsOnNodes(String args) throws IOException,
|
||||
YarnException {
|
||||
private int replaceLabelsOnNodes(String args, boolean failOnUnknownNodes)
|
||||
throws IOException, YarnException {
|
||||
Map<NodeId, Set<String>> map = buildNodeLabelsMapFromStr(args);
|
||||
return replaceLabelsOnNodes(map);
|
||||
return replaceLabelsOnNodes(map, failOnUnknownNodes);
|
||||
}
|
||||
|
||||
private int replaceLabelsOnNodes(Map<NodeId, Set<String>> map)
|
||||
throws IOException, YarnException {
|
||||
private int replaceLabelsOnNodes(Map<NodeId, Set<String>> map,
|
||||
boolean failOnUnknownNodes) throws IOException, YarnException {
|
||||
if (directlyAccessNodeLabelStore) {
|
||||
getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map);
|
||||
} else {
|
||||
|
@ -672,6 +674,7 @@ public class RMAdminCLI extends HAAdmin {
|
|||
createAdminProtocol();
|
||||
ReplaceLabelsOnNodeRequest request =
|
||||
ReplaceLabelsOnNodeRequest.newInstance(map);
|
||||
request.setFailOnUnknownNodes(failOnUnknownNodes);
|
||||
adminProtocol.replaceLabelsOnNode(request);
|
||||
}
|
||||
return 0;
|
||||
|
@ -783,8 +786,16 @@ public class RMAdminCLI extends HAAdmin {
|
|||
System.err.println(NO_MAPPING_ERR_MSG);
|
||||
printUsage("", isHAEnabled);
|
||||
exitCode = -1;
|
||||
} else if ("-failOnUnknownNodes".equals(args[i])) {
|
||||
if (i + 1 >= args.length) {
|
||||
System.err.println(NO_MAPPING_ERR_MSG);
|
||||
printUsage("", isHAEnabled);
|
||||
exitCode = -1;
|
||||
} else {
|
||||
exitCode = replaceLabelsOnNodes(args[i + 1], true);
|
||||
}
|
||||
} else {
|
||||
exitCode = replaceLabelsOnNodes(args[i]);
|
||||
exitCode = replaceLabelsOnNodes(args[i], false);
|
||||
}
|
||||
} else {
|
||||
exitCode = -1;
|
||||
|
|
|
@ -469,7 +469,7 @@ public class TestRMAdminCLI {
|
|||
"[username]] [-addToClusterNodeLabels " +
|
||||
"<\"label1(exclusive=true),label2(exclusive=false),label3\">] " +
|
||||
"[-removeFromClusterNodeLabels <label1,label2,label3>] " +
|
||||
"[-replaceLabelsOnNode " +
|
||||
"[-replaceLabelsOnNode [-failOnUnknownNodes] " +
|
||||
"<\"node1[:port]=label1,label2 node2[:port]=label1\">] " +
|
||||
"[-directlyAccessNodeLabelStore] [-refreshClusterMaxPriority] " +
|
||||
"[-updateNodeResource [NodeID] [MemSize] [vCores] " +
|
||||
|
@ -564,6 +564,7 @@ public class TestRMAdminCLI {
|
|||
+ " [username]] [-addToClusterNodeLabels <\"label1(exclusive=true),"
|
||||
+ "label2(exclusive=false),label3\">]"
|
||||
+ " [-removeFromClusterNodeLabels <label1,label2,label3>] [-replaceLabelsOnNode "
|
||||
+ "[-failOnUnknownNodes] "
|
||||
+ "<\"node1[:port]=label1,label2 node2[:port]=label1\">] [-directlyAccessNodeLabelStore] "
|
||||
+ "[-refreshClusterMaxPriority] "
|
||||
+ "[-updateNodeResource [NodeID] [MemSize] [vCores] "
|
||||
|
|
|
@ -146,6 +146,18 @@ public class ReplaceLabelsOnNodeRequestPBImpl extends
|
|||
nodeIdToLabels.putAll(map);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getFailOnUnknownNodes() {
|
||||
ReplaceLabelsOnNodeRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getFailOnUnknownNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFailOnUnknownNodes(boolean failOnUnknownNodes) {
|
||||
maybeInitBuilder();
|
||||
builder.setFailOnUnknownNodes(failOnUnknownNodes);
|
||||
}
|
||||
|
||||
private NodeIdProto convertToProtoFormat(NodeId t) {
|
||||
return ((NodeIdPBImpl) t).getProto();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -806,6 +809,49 @@ public class AdminService extends CompositeService implements
|
|||
|
||||
ReplaceLabelsOnNodeResponse response =
|
||||
recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class);
|
||||
|
||||
if (request.getFailOnUnknownNodes()) {
|
||||
// verify if nodes have registered to RM
|
||||
List<NodeId> unknownNodes = new ArrayList<>();
|
||||
for (NodeId requestedNode : request.getNodeToLabels().keySet()) {
|
||||
boolean isKnown = false;
|
||||
// both active and inactive nodes are recognized as known nodes
|
||||
if (requestedNode.getPort() != 0) {
|
||||
if (rmContext.getRMNodes().containsKey(requestedNode)
|
||||
|| rmContext.getInactiveRMNodes().containsKey(requestedNode)) {
|
||||
isKnown = true;
|
||||
}
|
||||
} else {
|
||||
for (NodeId knownNode : rmContext.getRMNodes().keySet()) {
|
||||
if (knownNode.getHost().equals(requestedNode.getHost())) {
|
||||
isKnown = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!isKnown) {
|
||||
for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) {
|
||||
if (knownNode.getHost().equals(requestedNode.getHost())) {
|
||||
isKnown = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!isKnown) {
|
||||
unknownNodes.add(requestedNode);
|
||||
}
|
||||
}
|
||||
|
||||
if (!unknownNodes.isEmpty()) {
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), operation, "",
|
||||
"AdminService",
|
||||
"Failed to replace labels as there are unknown nodes:"
|
||||
+ Arrays.toString(unknownNodes.toArray()));
|
||||
throw RPCUtil.getRemoteException(new IOException(
|
||||
"Failed to replace labels as there are unknown nodes:"
|
||||
+ Arrays.toString(unknownNodes.toArray())));
|
||||
}
|
||||
}
|
||||
try {
|
||||
rmContext.getNodeLabelManager().replaceLabelsOnNode(
|
||||
request.getNodeToLabels());
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.io.IOException;
|
|||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -64,9 +65,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -1085,6 +1086,106 @@ public class TestRMAdminService {
|
|||
rm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModifyLabelsOnUnknownNodes() throws IOException,
|
||||
YarnException {
|
||||
// create RM and set it's ACTIVE, and set distributed node label
|
||||
// configuration to true
|
||||
rm = new MockRM();
|
||||
|
||||
((RMContextImpl) rm.getRMContext())
|
||||
.setHAServiceState(HAServiceState.ACTIVE);
|
||||
Map<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
|
||||
rmNodes.put(NodeId.newInstance("host1", 1111),
|
||||
new RMNodeImpl(null, rm.getRMContext(), "host1", 0, 0, null, null,
|
||||
null));
|
||||
rmNodes.put(NodeId.newInstance("host2", 2222),
|
||||
new RMNodeImpl(null, rm.getRMContext(), "host2", 0, 0, null, null,
|
||||
null));
|
||||
rmNodes.put(NodeId.newInstance("host3", 3333),
|
||||
new RMNodeImpl(null, rm.getRMContext(), "host3", 0, 0, null, null,
|
||||
null));
|
||||
Map<NodeId, RMNode> rmInactiveNodes = rm.getRMContext()
|
||||
.getInactiveRMNodes();
|
||||
rmInactiveNodes.put(NodeId.newInstance("host4", 4444),
|
||||
new RMNodeImpl(null, rm.getRMContext(), "host4", 0, 0, null, null,
|
||||
null));
|
||||
RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
|
||||
|
||||
// by default, distributed configuration for node label is disabled, this
|
||||
// should pass
|
||||
labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x",
|
||||
"y"));
|
||||
// replace known node
|
||||
ReplaceLabelsOnNodeRequest request1 = ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host1", 1111),
|
||||
(Set<String>) ImmutableSet.of("x")));
|
||||
request1.setFailOnUnknownNodes(true);
|
||||
try {
|
||||
rm.adminService.replaceLabelsOnNode(request1);
|
||||
} catch (Exception ex) {
|
||||
fail("should not fail on known node");
|
||||
}
|
||||
|
||||
// replace known node with wildcard port
|
||||
ReplaceLabelsOnNodeRequest request2 = ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host1", 0),
|
||||
(Set<String>) ImmutableSet.of("x")));
|
||||
request2.setFailOnUnknownNodes(true);
|
||||
try {
|
||||
rm.adminService.replaceLabelsOnNode(request2);
|
||||
} catch (Exception ex) {
|
||||
fail("should not fail on known node");
|
||||
}
|
||||
|
||||
// replace unknown node
|
||||
ReplaceLabelsOnNodeRequest request3 = ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host5", 0),
|
||||
(Set<String>) ImmutableSet.of("x")));
|
||||
request3.setFailOnUnknownNodes(true);
|
||||
try {
|
||||
rm.adminService.replaceLabelsOnNode(request3);
|
||||
fail("Should fail on unknown node");
|
||||
} catch (Exception ex) {
|
||||
}
|
||||
|
||||
// replace known node but wrong port
|
||||
ReplaceLabelsOnNodeRequest request4 = ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host2", 1111),
|
||||
(Set<String>) ImmutableSet.of("x")));
|
||||
request4.setFailOnUnknownNodes(true);
|
||||
try {
|
||||
rm.adminService.replaceLabelsOnNode(request4);
|
||||
fail("Should fail on node with wrong port");
|
||||
} catch (Exception ex) {
|
||||
}
|
||||
|
||||
// replace non-exist node but not check
|
||||
ReplaceLabelsOnNodeRequest request5 = ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host5", 0),
|
||||
(Set<String>) ImmutableSet.of("x")));
|
||||
request5.setFailOnUnknownNodes(false);
|
||||
try {
|
||||
rm.adminService.replaceLabelsOnNode(request5);
|
||||
} catch (Exception ex) {
|
||||
fail("Should not fail on unknown node when "
|
||||
+ "fail-on-unkown-nodes is set false");
|
||||
}
|
||||
|
||||
// replace on inactive node
|
||||
ReplaceLabelsOnNodeRequest request6 = ReplaceLabelsOnNodeRequest
|
||||
.newInstance(ImmutableMap.of(NodeId.newInstance("host4", 0),
|
||||
(Set<String>) ImmutableSet.of("x")));
|
||||
request6.setFailOnUnknownNodes(true);
|
||||
try {
|
||||
rm.adminService.replaceLabelsOnNode(request6);
|
||||
} catch (Exception ex) {
|
||||
fail("should not fail on inactive node");
|
||||
}
|
||||
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled()
|
||||
throws IOException, YarnException {
|
||||
|
|
Loading…
Reference in New Issue