YARN-7119. Support multiple resource types in rmadmin updateNodeResource command. Contributed by Manikandan R.
This commit is contained in:
parent
91c96bdf8e
commit
2564b4d07f
|
@ -401,7 +401,7 @@ public class ResourceUtils {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized static void resetResourceTypes() {
|
||||
public synchronized static void resetResourceTypes() {
|
||||
initializedResources = false;
|
||||
}
|
||||
|
||||
|
@ -415,16 +415,37 @@ public class ResourceUtils {
|
|||
}
|
||||
|
||||
public static String getUnits(String resourceValue) {
|
||||
String units;
|
||||
for (int i = 0; i < resourceValue.length(); i++) {
|
||||
return parseResourceValue(resourceValue)[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract unit and actual value from resource value.
|
||||
* @param resourceValue Value of the resource
|
||||
* @return Array containing unit and value. [0]=unit, [1]=value
|
||||
* @throws IllegalArgumentExcpetion if units contain non alpha characters
|
||||
*/
|
||||
public static String[] parseResourceValue(String resourceValue) {
|
||||
String[] resource = new String[2];
|
||||
int i = 0;
|
||||
for (; i < resourceValue.length(); i++) {
|
||||
if (Character.isAlphabetic(resourceValue.charAt(i))) {
|
||||
units = resourceValue.substring(i);
|
||||
if (StringUtils.isAlpha(units)) {
|
||||
return units;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
String units = resourceValue.substring(i);
|
||||
|
||||
if((StringUtils.isAlpha(units))) {
|
||||
resource[0] = units;
|
||||
resource[1] = resourceValue.substring(0, i);
|
||||
return resource;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Units '" + units + "'"
|
||||
+ " contains non alphabet characters, which is not allowed.");
|
||||
}
|
||||
}
|
||||
|
||||
public static long getValue(String resourceValue) {
|
||||
return Long.parseLong(parseResourceValue(resourceValue)[1]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -628,6 +649,49 @@ public class ResourceUtils {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
/**
|
||||
* Are mandatory resources like memory-mb, vcores available?
|
||||
* If not, throw exceptions. On availability, ensure those values are
|
||||
* within boundary.
|
||||
* @param res resource
|
||||
* @throws IllegalArgumentException if mandatory resource is not available or
|
||||
* value is not within boundary
|
||||
*/
|
||||
public static void areMandatoryResourcesAvailable(Resource res) {
|
||||
ResourceInformation memoryResourceInformation =
|
||||
res.getResourceInformation(MEMORY);
|
||||
if (memoryResourceInformation != null) {
|
||||
long value = memoryResourceInformation.getValue();
|
||||
if (value > Integer.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("Value '" + value + "' for "
|
||||
+ "resource memory is more than the maximum for an integer.");
|
||||
}
|
||||
if (value == 0) {
|
||||
throw new IllegalArgumentException("Invalid value for resource '" +
|
||||
MEMORY + "'. Value cannot be 0(zero).");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Mandatory resource 'memory-mb' "
|
||||
+ "is missing.");
|
||||
}
|
||||
|
||||
ResourceInformation vcoresResourceInformation =
|
||||
res.getResourceInformation(VCORES);
|
||||
if (vcoresResourceInformation != null) {
|
||||
long value = vcoresResourceInformation.getValue();
|
||||
if (value > Integer.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("Value '" + value + "' for resource"
|
||||
+ " vcores is more than the maximum for an integer.");
|
||||
}
|
||||
if (value == 0) {
|
||||
throw new IllegalArgumentException("Invalid value for resource '" +
|
||||
VCORES + "'. Value cannot be 0(zero).");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Mandatory resource 'vcores' "
|
||||
+ "is missing.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an array of {@link ResourceInformation} objects corresponding to
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.commons.cli.MissingArgumentException;
|
|||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.DecommissionType;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
|
||||
|
@ -74,6 +77,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLa
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -96,6 +100,8 @@ public class RMAdminCLI extends HAAdmin {
|
|||
private static final String ADD_LABEL_FORMAT_ERR_MSG =
|
||||
"Input format for adding node-labels is not correct, it should be "
|
||||
+ "labelName1[(exclusive=true/false)],LabelName2[] ..";
|
||||
private static final Pattern RESOURCE_TYPES_ARGS_PATTERN =
|
||||
Pattern.compile("^[0-9]*$");
|
||||
|
||||
protected final static Map<String, UsageInfo> ADMIN_USAGE =
|
||||
ImmutableMap.<String, UsageInfo>builder()
|
||||
|
@ -159,7 +165,9 @@ public class RMAdminCLI extends HAAdmin {
|
|||
new UsageInfo("",
|
||||
"Refresh cluster max priority"))
|
||||
.put("-updateNodeResource",
|
||||
new UsageInfo("[NodeID] [MemSize] [vCores] ([OvercommitTimeout])",
|
||||
new UsageInfo("[NodeID] [MemSize] [vCores] ([OvercommitTimeout])"
|
||||
+ " \n\t\tor\n\t\t[NodeID] [resourcetypes] "
|
||||
+ "([OvercommitTimeout]). ",
|
||||
"Update resource on specific node."))
|
||||
.build();
|
||||
|
||||
|
@ -262,26 +270,27 @@ public class RMAdminCLI extends HAAdmin {
|
|||
StringBuilder summary = new StringBuilder();
|
||||
summary.append("rmadmin is the command to execute YARN administrative " +
|
||||
"commands.\n");
|
||||
summary.append("The full syntax is: \n\n" +
|
||||
"yarn rmadmin" +
|
||||
" [-refreshQueues]" +
|
||||
" [-refreshNodes [-g|graceful [timeout in seconds] -client|server]]" +
|
||||
" [-refreshNodesResources]" +
|
||||
" [-refreshSuperUserGroupsConfiguration]" +
|
||||
" [-refreshUserToGroupsMappings]" +
|
||||
" [-refreshAdminAcls]" +
|
||||
" [-refreshServiceAcl]" +
|
||||
" [-getGroup [username]]" +
|
||||
" [-addToClusterNodeLabels <\"label1(exclusive=true),"
|
||||
+ "label2(exclusive=false),label3\">]" +
|
||||
" [-removeFromClusterNodeLabels <label1,label2,label3>]" +
|
||||
" [-replaceLabelsOnNode " +
|
||||
"<\"node1[:port]=label1,label2 node2[:port]=label1\"> " +
|
||||
"[-failOnUnknownNodes]]" +
|
||||
" [-directlyAccessNodeLabelStore]" +
|
||||
" [-refreshClusterMaxPriority]" +
|
||||
" [-updateNodeResource [NodeID] [MemSize] [vCores]" +
|
||||
" ([OvercommitTimeout])");
|
||||
summary.append("The full syntax is: \n\n"
|
||||
+ "yarn rmadmin"
|
||||
+ " [-refreshQueues]"
|
||||
+ " [-refreshNodes [-g|graceful [timeout in seconds] -client|server]]"
|
||||
+ " [-refreshNodesResources]"
|
||||
+ " [-refreshSuperUserGroupsConfiguration]"
|
||||
+ " [-refreshUserToGroupsMappings]"
|
||||
+ " [-refreshAdminAcls]"
|
||||
+ " [-refreshServiceAcl]"
|
||||
+ " [-getGroup [username]]"
|
||||
+ " [-addToClusterNodeLabels <\"label1(exclusive=true),"
|
||||
+ "label2(exclusive=false),label3\">]"
|
||||
+ " [-removeFromClusterNodeLabels <label1,label2,label3>]"
|
||||
+ " [-replaceLabelsOnNode "
|
||||
+ "<\"node1[:port]=label1,label2 node2[:port]=label1\"> "
|
||||
+ "[-failOnUnknownNodes]]"
|
||||
+ " [-directlyAccessNodeLabelStore]"
|
||||
+ " [-refreshClusterMaxPriority]"
|
||||
+ " [-updateNodeResource [NodeID] [MemSize] [vCores]"
|
||||
+ " ([OvercommitTimeout]) or -updateNodeResource [NodeID] "
|
||||
+ "[ResourceTypes] ([OvercommitTimeout])]");
|
||||
if (isHAEnabled) {
|
||||
appendHAUsage(summary);
|
||||
}
|
||||
|
@ -471,20 +480,14 @@ public class RMAdminCLI extends HAAdmin {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private int updateNodeResource(String nodeIdStr, int memSize,
|
||||
int cores, int overCommitTimeout) throws IOException, YarnException {
|
||||
// check resource value first
|
||||
if (invalidResourceValue(memSize, cores)) {
|
||||
throw new IllegalArgumentException("Invalid resource value: " + "(" +
|
||||
memSize + "," + cores + ") for updateNodeResource.");
|
||||
}
|
||||
// Refresh the nodes
|
||||
private int updateNodeResource(String nodeIdStr, Resource resource,
|
||||
int overCommitTimeout) throws YarnException, IOException {
|
||||
|
||||
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
|
||||
UpdateNodeResourceRequest request =
|
||||
recordFactory.newRecordInstance(UpdateNodeResourceRequest.class);
|
||||
NodeId nodeId = NodeId.fromString(nodeIdStr);
|
||||
|
||||
Resource resource = Resources.createResource(memSize, cores);
|
||||
|
||||
Map<NodeId, ResourceOption> resourceMap =
|
||||
new HashMap<NodeId, ResourceOption>();
|
||||
resourceMap.put(
|
||||
|
@ -919,26 +922,95 @@ public class RMAdminCLI extends HAAdmin {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle resources of two different formats:
|
||||
*
|
||||
* 1. -updateNodeResource [NodeID] [MemSize] [vCores] ([overCommitTimeout])
|
||||
* 2. -updateNodeResource [NodeID] [ResourceTypes] ([overCommitTimeout])
|
||||
*
|
||||
* Incase of No. of args is 4 or 5, 2nd arg should contain only numbers to
|
||||
* satisfy the 1st format. Otherwise, 2nd format flow continues.
|
||||
* @param args arguments of the command
|
||||
* @param cmd whole command to be parsed
|
||||
* @param isHAEnabled Is HA enabled or not?
|
||||
* @return 1 on success, -1 on errors
|
||||
* @throws IOException if any issues thrown from RPC layer
|
||||
* @throws YarnException if any issues thrown from server
|
||||
*/
|
||||
private int handleUpdateNodeResource(
|
||||
String[] args, String cmd, boolean isHAEnabled)
|
||||
throws NumberFormatException, IOException, YarnException {
|
||||
throws YarnException, IOException {
|
||||
int i = 1;
|
||||
if (args.length < 4 || args.length > 5) {
|
||||
int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
|
||||
String nodeID = args[i++];
|
||||
Resource resource = Resource.newInstance(0, 0);
|
||||
if (args.length < 3 || args.length > 5) {
|
||||
System.err.println("Number of parameters specified for " +
|
||||
"updateNodeResource is wrong.");
|
||||
printUsage(cmd, isHAEnabled);
|
||||
return -1;
|
||||
} else {
|
||||
String nodeID = args[i++];
|
||||
String memSize = args[i++];
|
||||
String cores = args[i++];
|
||||
int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
|
||||
if (i == args.length - 1) {
|
||||
overCommitTimeout = Integer.parseInt(args[i]);
|
||||
} else if ((args.length == 4 || args.length == 5) &&
|
||||
RESOURCE_TYPES_ARGS_PATTERN.matcher(args[2]).matches()) {
|
||||
int memSize = Integer.parseInt(args[i++]);
|
||||
int cores = Integer.parseInt(args[i++]);
|
||||
|
||||
// check resource value first
|
||||
if (invalidResourceValue(memSize, cores)) {
|
||||
throw new IllegalArgumentException("Invalid resource value: " + "(" +
|
||||
memSize + "," + cores + ") for updateNodeResource.");
|
||||
}
|
||||
return updateNodeResource(nodeID, Integer.parseInt(memSize),
|
||||
Integer.parseInt(cores), overCommitTimeout);
|
||||
resource = Resources.createResource(memSize, cores);
|
||||
} else {
|
||||
String resourceTypes = args[i++];
|
||||
if (!resourceTypes.contains("=")) {
|
||||
System.err.println("Resource Types parameter specified for "
|
||||
+ "updateNodeResource is wrong. It should be comma-delimited "
|
||||
+ "key value pairs. For example, memory-mb=1024Mi,"
|
||||
+ "vcores=1,resource1=3Gi,resource2=2");
|
||||
printUsage(cmd, isHAEnabled);
|
||||
return -1;
|
||||
}
|
||||
resource = parseCommandAndCreateResource(resourceTypes);
|
||||
ResourceUtils.areMandatoryResourcesAvailable(resource);
|
||||
}
|
||||
if (i == args.length - 1) {
|
||||
overCommitTimeout = Integer.parseInt(args[i]);
|
||||
}
|
||||
return updateNodeResource(nodeID, resource, overCommitTimeout);
|
||||
}
|
||||
|
||||
private Resource parseCommandAndCreateResource(String resourceTypes) {
|
||||
Resource resource = Resource.newInstance(0, 0);
|
||||
Map<String, ResourceInformation> resourceTypesFromRM =
|
||||
ResourceUtils.getResourceTypes();
|
||||
String[] resourceTypesArr = resourceTypes.split(",");
|
||||
for (int k = 0; k < resourceTypesArr.length; k++) {
|
||||
String resourceType = resourceTypesArr[k];
|
||||
String[] resourceTypeArray = resourceType.split("=");
|
||||
if (resourceTypeArray.length == 2) {
|
||||
String resName = StringUtils.trim(resourceTypeArray[0]);
|
||||
String resValue = StringUtils.trim(resourceTypeArray[1]);
|
||||
if (resourceTypesFromRM.containsKey(resName)) {
|
||||
String[] resourceValue = ResourceUtils.parseResourceValue(resValue);
|
||||
if (resourceValue.length == 2) {
|
||||
ResourceInformation ri = ResourceInformation.newInstance(resName,
|
||||
resourceValue[0], Long.parseLong(resourceValue[1]));
|
||||
resource.setResourceInformation(resName, ri);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid resource value: " +
|
||||
resValue + ". Unable to extract unit and actual value.");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid resource type: " +
|
||||
resName + ". Not allowed.");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid resource type value: " +
|
||||
"("+ resourceType + ") for updateNodeResource. "
|
||||
+ "It should be key value pairs separated using '=' symbol.");
|
||||
}
|
||||
}
|
||||
return resource;
|
||||
}
|
||||
|
||||
private int validateTimeout(String strTimeout) {
|
||||
|
|
|
@ -34,13 +34,16 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
|
@ -50,6 +53,7 @@ import org.apache.hadoop.service.Service.STATE;
|
|||
import org.apache.hadoop.yarn.api.records.DecommissionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -70,7 +74,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
@ -92,6 +98,27 @@ public class TestRMAdminCLI {
|
|||
private boolean remoteAdminServiceAccessed = false;
|
||||
private static final String HOST_A = "1.2.3.1";
|
||||
private static final String HOST_B = "1.2.3.2";
|
||||
private static File dest;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
ResourceUtils.resetResourceTypes();
|
||||
Configuration yarnConf = new YarnConfiguration();
|
||||
String resourceTypesFile = "resource-types-4.xml";
|
||||
InputStream source =
|
||||
yarnConf.getClassLoader().getResourceAsStream(resourceTypesFile);
|
||||
dest = new File(yarnConf.getClassLoader().
|
||||
getResource(".").getPath(), "resource-types.xml");
|
||||
FileUtils.copyInputStreamToFile(source, dest);
|
||||
ResourceUtils.getResourceTypes();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
if (dest.exists()) {
|
||||
dest.delete();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
@Before
|
||||
|
@ -256,17 +283,181 @@ public class TestRMAdminCLI {
|
|||
resource);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceWithOverCommitTimeout() throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
int memSize = 2048;
|
||||
int cores = 2;
|
||||
int timeout = 1000;
|
||||
String[] args = {"-updateNodeResource", nodeIdStr,
|
||||
Integer.toString(memSize), Integer.toString(cores),
|
||||
Integer.toString(timeout)};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
ArgumentCaptor<UpdateNodeResourceRequest> argument =
|
||||
ArgumentCaptor.forClass(UpdateNodeResourceRequest.class);
|
||||
verify(admin).updateNodeResource(argument.capture());
|
||||
UpdateNodeResourceRequest request = argument.getValue();
|
||||
Map<NodeId, ResourceOption> resourceMap = request.getNodeResourceMap();
|
||||
NodeId nodeId = NodeId.fromString(nodeIdStr);
|
||||
Resource expectedResource = Resources.createResource(memSize, cores);
|
||||
ResourceOption resource = resourceMap.get(nodeId);
|
||||
assertNotNull("resource for " + nodeIdStr + " shouldn't be null.",
|
||||
resource);
|
||||
assertEquals("resource value for " + nodeIdStr + " is not as expected.",
|
||||
ResourceOption.newInstance(expectedResource, timeout), resource);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceWithInvalidValue() throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
int memSize = -2048;
|
||||
int cores = 2;
|
||||
String[] args = { "-updateNodeResource", nodeIdStr,
|
||||
Integer.toString(memSize), Integer.toString(cores) };
|
||||
String[] args = {"-updateNodeResource", nodeIdStr,
|
||||
Integer.toString(memSize), Integer.toString(cores)};
|
||||
// execution of command line is expected to be failed
|
||||
assertEquals(-1, rmAdminCLI.run(args));
|
||||
// verify admin protocol never calls.
|
||||
verify(admin,times(0)).updateNodeResource(
|
||||
verify(admin, times(0)).updateNodeResource(
|
||||
any(UpdateNodeResourceRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypes() throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes =
|
||||
"memory-mb=1024Mi,vcores=1,resource1=3Gi,resource2=2m";
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
ArgumentCaptor<UpdateNodeResourceRequest> argument =
|
||||
ArgumentCaptor.forClass(UpdateNodeResourceRequest.class);
|
||||
verify(admin).updateNodeResource(argument.capture());
|
||||
UpdateNodeResourceRequest request = argument.getValue();
|
||||
Map<NodeId, ResourceOption> resourceMap = request.getNodeResourceMap();
|
||||
NodeId nodeId = NodeId.fromString(nodeIdStr);
|
||||
|
||||
Resource expectedResource = Resource.newInstance(1024, 1);
|
||||
expectedResource.setResourceInformation("resource1",
|
||||
ResourceInformation.newInstance("resource1", "Gi", 3));
|
||||
expectedResource.setResourceInformation("resource2",
|
||||
ResourceInformation.newInstance("resource2", "m", 2));
|
||||
|
||||
ResourceOption resource = resourceMap.get(nodeId);
|
||||
assertNotNull("resource for " + nodeIdStr + " shouldn't be null.",
|
||||
resource);
|
||||
assertEquals("resource value for " + nodeIdStr + " is not as expected.",
|
||||
ResourceOption.newInstance(expectedResource,
|
||||
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), resource);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypesWithOverCommitTimeout()
|
||||
throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes =
|
||||
"memory-mb=1024Mi,vcores=1,resource1=3Gi,resource2=2m";
|
||||
int timeout = 1000;
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes,
|
||||
Integer.toString(timeout)};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
ArgumentCaptor<UpdateNodeResourceRequest> argument =
|
||||
ArgumentCaptor.forClass(UpdateNodeResourceRequest.class);
|
||||
verify(admin).updateNodeResource(argument.capture());
|
||||
UpdateNodeResourceRequest request = argument.getValue();
|
||||
Map<NodeId, ResourceOption> resourceMap = request.getNodeResourceMap();
|
||||
NodeId nodeId = NodeId.fromString(nodeIdStr);
|
||||
|
||||
Resource expectedResource = Resource.newInstance(1024, 1);
|
||||
expectedResource.setResourceInformation("resource1",
|
||||
ResourceInformation.newInstance("resource1", "Gi", 3));
|
||||
expectedResource.setResourceInformation("resource2",
|
||||
ResourceInformation.newInstance("resource2", "m", 2));
|
||||
|
||||
ResourceOption resource = resourceMap.get(nodeId);
|
||||
assertNotNull("resource for " + nodeIdStr + " shouldn't be null.",
|
||||
resource);
|
||||
assertEquals("resource value for " + nodeIdStr + " is not as expected.",
|
||||
ResourceOption.newInstance(expectedResource, timeout), resource);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypesWithoutMandatoryResources()
|
||||
throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes = "resource1=3Gi,resource2=2m";
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes};
|
||||
assertEquals(-1, rmAdminCLI.run(args));
|
||||
|
||||
// verify admin protocol never calls.
|
||||
verify(admin, times(0)).updateNodeResource(
|
||||
any(UpdateNodeResourceRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypesWithInvalidResource()
|
||||
throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes =
|
||||
"memory-mb=1024Mi,vcores=1,resource1=3Gi,resource3=2m";
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes};
|
||||
// execution of command line is expected to be failed
|
||||
assertEquals(-1, rmAdminCLI.run(args));
|
||||
// verify admin protocol never calls.
|
||||
verify(admin, times(0)).updateNodeResource(
|
||||
any(UpdateNodeResourceRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypesWithInvalidResourceValue()
|
||||
throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes =
|
||||
"memory-mb=1024Mi,vcores=1,resource1=ABDC,resource2=2m";
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes};
|
||||
// execution of command line is expected to be failed
|
||||
assertEquals(-1, rmAdminCLI.run(args));
|
||||
// verify admin protocol never calls.
|
||||
verify(admin, times(0)).updateNodeResource(
|
||||
any(UpdateNodeResourceRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypesWithInvalidResourceUnit()
|
||||
throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes =
|
||||
"memory-mb=1024Mi,vcores=1,resource1=2XYZ,resource2=2m";
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes};
|
||||
// execution of command line is expected to be failed
|
||||
assertEquals(-1, rmAdminCLI.run(args));
|
||||
// verify admin protocol never calls.
|
||||
verify(admin, times(0)).updateNodeResource(
|
||||
any(UpdateNodeResourceRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypesWithNonAlphaResourceUnit()
|
||||
throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes =
|
||||
"memory-mb=1024M i,vcores=1,resource1=2G,resource2=2m";
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes};
|
||||
// execution of command line is expected to be failed
|
||||
assertEquals(-1, rmAdminCLI.run(args));
|
||||
// verify admin protocol never calls.
|
||||
verify(admin, times(0)).updateNodeResource(
|
||||
any(UpdateNodeResourceRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResourceTypesWithInvalidResourceFormat()
|
||||
throws Exception {
|
||||
String nodeIdStr = "0.0.0.0:0";
|
||||
String resourceTypes = "memory-mb=1024Mi,vcores=1,resource2";
|
||||
String[] args = {"-updateNodeResource", nodeIdStr, resourceTypes};
|
||||
// execution of command line is expected to be failed
|
||||
assertEquals(-1, rmAdminCLI.run(args));
|
||||
// verify admin protocol never calls.
|
||||
verify(admin, times(0)).updateNodeResource(
|
||||
any(UpdateNodeResourceRequest.class));
|
||||
}
|
||||
|
||||
|
@ -513,8 +704,10 @@ public class TestRMAdminCLI {
|
|||
"<\"node1[:port]=label1,label2 node2[:port]=label1\"> " +
|
||||
"[-failOnUnknownNodes]] " +
|
||||
"[-directlyAccessNodeLabelStore] [-refreshClusterMaxPriority] " +
|
||||
"[-updateNodeResource [NodeID] [MemSize] [vCores] " +
|
||||
"([OvercommitTimeout]) [-help [cmd]]"));
|
||||
"[-updateNodeResource [NodeID] [MemSize] [vCores] "
|
||||
+ "([OvercommitTimeout]) or -updateNodeResource "
|
||||
+ "[NodeID] [ResourceTypes] ([OvercommitTimeout])] "
|
||||
+ "[-help [cmd]]"));
|
||||
assertTrue(dataOut
|
||||
.toString()
|
||||
.contains(
|
||||
|
@ -610,6 +803,8 @@ public class TestRMAdminCLI {
|
|||
+ "[-refreshClusterMaxPriority] "
|
||||
+ "[-updateNodeResource [NodeID] [MemSize] [vCores] "
|
||||
+ "([OvercommitTimeout]) "
|
||||
+ "or -updateNodeResource [NodeID] [ResourceTypes] "
|
||||
+ "([OvercommitTimeout])] "
|
||||
+ "[-transitionToActive [--forceactive] <serviceId>] "
|
||||
+ "[-transitionToStandby <serviceId>] "
|
||||
+ "[-getServiceState <serviceId>] [-getAllServiceState] "
|
||||
|
|
|
@ -212,7 +212,7 @@ Usage:
|
|||
-replaceLabelsOnNode <"node1[:port]=label1,label2 node2[:port]=label1,label2"> [-failOnUnknownNodes]
|
||||
-directlyAccessNodeLabelStore
|
||||
-refreshClusterMaxPriority
|
||||
-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])
|
||||
-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout]) or -updateNodeResource [NodeID] [ResourceTypes] ([OvercommitTimeout])
|
||||
-transitionToActive [--forceactive] <serviceId>
|
||||
-transitionToStandby <serviceId>
|
||||
-failover [--forcefence] [--forceactive] <serviceId> <serviceId>
|
||||
|
@ -238,6 +238,7 @@ Usage:
|
|||
| -directlyAccessNodeLabelStore | This is DEPRECATED, will be removed in future releases. Directly access node label store, with this option, all node label related operations will not connect RM. Instead, they will access/modify stored node labels directly. By default, it is false (access via RM). AND PLEASE NOTE: if you configured yarn.node-labels.fs-store.root-dir to a local directory (instead of NFS or HDFS), this option will only work when the command run on the machine where RM is running. |
|
||||
| -refreshClusterMaxPriority | Refresh cluster max priority |
|
||||
| -updateNodeResource [NodeID] [MemSize] [vCores] \([OvercommitTimeout]\) | Update resource on specific node. |
|
||||
| -updateNodeResource [NodeID] [ResourceTypes] \([OvercommitTimeout]\) | Update resource types on specific node. Resource Types is comma-delimited key value pairs of any resources availale at Resource Manager. For example, memory-mb=1024Mi,vcores=1,resource1=2G,resource2=4m|
|
||||
| -transitionToActive [--forceactive] [--forcemanual] \<serviceId\> | Transitions the service into Active state. Try to make the target active without checking that there is no active node if the --forceactive option is used. This command can not be used if automatic failover is enabled. Though you can override this by --forcemanual option, you need caution. This command can not be used if automatic failover is enabled.|
|
||||
| -transitionToStandby [--forcemanual] \<serviceId\> | Transitions the service into Standby state. This command can not be used if automatic failover is enabled. Though you can override this by --forcemanual option, you need caution. |
|
||||
| -failover [--forceactive] \<serviceId1\> \<serviceId2\> | Initiate a failover from serviceId1 to serviceId2. Try to failover to the target service even if it is not ready if the --forceactive option is used. This command can not be used if automatic failover is enabled. |
|
||||
|
|
Loading…
Reference in New Issue