YARN-9034. ApplicationCLI should have option to take clusterId. Contributed by Rohith Sharma K S.

This commit is contained in:
Suma Shivaprasad 2018-11-28 00:42:00 -08:00
parent 34a914be03
commit 7dc272199f
9 changed files with 92 additions and 73 deletions

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
@ -113,6 +114,7 @@ public class ApplicationCLI extends YarnCLI {
public static final String VERSION = "version";
public static final String STATES = "states";
public static final String SHELL_CMD = "shell";
public static final String CLUSTER_ID_OPTION = "clusterId";
private static String firstArg = null;
@ -278,6 +280,8 @@ public int run(String[] args) throws Exception {
"the ability to finalize the upgrade automatically.");
opts.addOption(UPGRADE_CANCEL, false, "Works with -upgrade option to " +
"cancel current upgrade.");
opts.addOption(CLUSTER_ID_OPTION, true, "ClusterId. "
+ "By default, it will take default cluster id from the RM");
opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name");
opts.getOption(LAUNCH_CMD).setArgs(2);
opts.getOption(START_CMD).setArgName("Application Name");
@ -302,6 +306,7 @@ public int run(String[] args) throws Exception {
opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES);
opts.getOption(DECOMMISSION).setArgName("Application Name");
opts.getOption(DECOMMISSION).setArgs(1);
opts.getOption(CLUSTER_ID_OPTION).setArgName("Cluster ID");
} else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) {
opts.addOption(STATUS_CMD, true,
"Prints the status of the application attempt.");
@ -309,9 +314,12 @@ public int run(String[] args) throws Exception {
"List application attempts for application.");
opts.addOption(FAIL_CMD, true, "Fails application attempt.");
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
opts.addOption(CLUSTER_ID_OPTION, true, "ClusterId. "
+ "By default, it will take default cluster id from the RM");
opts.getOption(STATUS_CMD).setArgName("Application Attempt ID");
opts.getOption(LIST_CMD).setArgName("Application ID");
opts.getOption(FAIL_CMD).setArgName("Application Attempt ID");
opts.getOption(CLUSTER_ID_OPTION).setArgName("Cluster ID");
} else if (title != null && title.equalsIgnoreCase(CONTAINER)) {
opts.addOption(SHELL_CMD, true,
"Run a shell in the container.");
@ -358,6 +366,9 @@ public int run(String[] args) throws Exception {
" Default command is OUTPUT_THREAD_DUMP.");
opts.getOption(SIGNAL_CMD).setArgName("container ID [signal command]");
opts.getOption(SIGNAL_CMD).setArgs(3);
opts.addOption(CLUSTER_ID_OPTION, true, "ClusterId. "
+ "By default, it will take default cluster id from the RM");
opts.getOption(CLUSTER_ID_OPTION).setArgName("Cluster ID");
}
int exitCode = -1;
@ -382,6 +393,12 @@ public int run(String[] args) throws Exception {
}
}
if (cliParser.hasOption(CLUSTER_ID_OPTION)) {
String clusterIdStr = cliParser.getOptionValue(CLUSTER_ID_OPTION);
getConf().set(YarnConfiguration.RM_CLUSTER_ID, clusterIdStr);
}
createAndStartYarnClient();
if (cliParser.hasOption(STATUS_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, STATUS_CMD, APP_TYPE_CMD)) {
printUsage(title, opts);

View File

@ -100,6 +100,8 @@ public int run(String[] args) throws Exception {
return exitCode;
}
createAndStartYarnClient();
if (parsedCli.hasOption(DIRECTLY_ACCESS_NODE_LABEL_STORE)) {
accessLocal = true;
}

View File

@ -107,6 +107,8 @@ public int run(String[] args) throws Exception {
return exitCode;
}
createAndStartYarnClient();
if (cliParser.hasOption("status")) {
if (args.length != 2) {
printUsage(opts);

View File

@ -69,7 +69,7 @@ public int run(String[] args) throws Exception {
printUsage(opts);
return -1;
}
createAndStartYarnClient();
if (cliParser.hasOption(STATUS_CMD)) {
if (args.length != 2) {
printUsage(opts);

View File

@ -463,6 +463,7 @@ public int run(String[] args) throws Exception {
LOG.error("Unable to parse options", e);
return 1;
}
createAndStartYarnClient();
setAppsHeader();
Thread keyboardMonitor = new KeyboardMonitor();

View File

@ -43,15 +43,18 @@ public abstract class YarnCLI extends Configured implements Tool {
public YarnCLI() {
super(new YarnConfiguration());
client = createYarnClient();
client.init(getConf());
client.start();
}
protected YarnClient createYarnClient() {
return YarnClient.createYarnClient();
}
protected void createAndStartYarnClient() {
client = createYarnClient();
client.init(getConf());
client.start();
}
public void setSysOutPrintStream(PrintStream sysout) {
this.sysout = sysout;
}

View File

@ -42,10 +42,12 @@
import com.google.common.collect.ImmutableSet;
public class TestClusterCLI {
ByteArrayOutputStream sysOutStream;
private PrintStream sysOut;
ByteArrayOutputStream sysErrStream;
private PrintStream sysErr;
private YarnClient client = mock(YarnClient.class);
@Before
public void setup() {
@ -58,14 +60,10 @@ public void setup() {
@Test
public void testGetClusterNodeLabels() throws Exception {
YarnClient client = mock(YarnClient.class);
when(client.getClusterNodeLabels()).thenReturn(
Arrays.asList(NodeLabel.newInstance("label1"),
NodeLabel.newInstance("label2")));
ClusterCLI cli = new ClusterCLI();
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
ClusterCLI cli = createAndGetClusterCLI();
int rc =
cli.run(new String[] { ClusterCLI.CMD, "-" + ClusterCLI.LIST_LABELS_CMD });
@ -80,16 +78,12 @@ public void testGetClusterNodeLabels() throws Exception {
@Test
public void testGetClusterNodeAttributes() throws Exception {
YarnClient client = mock(YarnClient.class);
when(client.getClusterAttributes()).thenReturn(ImmutableSet
.of(NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
NodeAttributeType.STRING), NodeAttributeInfo
.newInstance(NodeAttributeKey.newInstance("CPU"),
NodeAttributeType.STRING)));
ClusterCLI cli = new ClusterCLI();
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
ClusterCLI cli = createAndGetClusterCLI();
int rc = cli.run(new String[] {ClusterCLI.CMD,
"-" + ClusterCLI.LIST_CLUSTER_ATTRIBUTES});
@ -105,14 +99,10 @@ public void testGetClusterNodeAttributes() throws Exception {
@Test
public void testGetClusterNodeLabelsWithLocalAccess() throws Exception {
YarnClient client = mock(YarnClient.class);
when(client.getClusterNodeLabels()).thenReturn(
Arrays.asList(NodeLabel.newInstance("remote1"),
NodeLabel.newInstance("remote2")));
ClusterCLI cli = new ClusterCLI();
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
ClusterCLI cli = createAndGetClusterCLI();
ClusterCLI.localNodeLabelsManager = mock(CommonNodeLabelsManager.class);
when(ClusterCLI.localNodeLabelsManager.getClusterNodeLabels()).thenReturn(
Arrays.asList(NodeLabel.newInstance("local1"),
@ -134,12 +124,8 @@ public void testGetClusterNodeLabelsWithLocalAccess() throws Exception {
@Test
public void testGetEmptyClusterNodeLabels() throws Exception {
YarnClient client = mock(YarnClient.class);
when(client.getClusterNodeLabels()).thenReturn(new ArrayList<NodeLabel>());
ClusterCLI cli = new ClusterCLI();
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
ClusterCLI cli = createAndGetClusterCLI();
int rc =
cli.run(new String[] { ClusterCLI.CMD, "-" + ClusterCLI.LIST_LABELS_CMD });
@ -154,9 +140,7 @@ public void testGetEmptyClusterNodeLabels() throws Exception {
@Test
public void testHelp() throws Exception {
ClusterCLI cli = new ClusterCLI();
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
ClusterCLI cli = createAndGetClusterCLI();
int rc =
cli.run(new String[] { "cluster", "--help" });
@ -192,4 +176,15 @@ public void testHelp() throws Exception {
pw.close();
verify(sysOut).println(baos.toString("UTF-8"));
}
private ClusterCLI createAndGetClusterCLI() {
ClusterCLI cli = new ClusterCLI() {
@Override protected void createAndStartYarnClient() {
}
};
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
return cli;
}
}

View File

@ -993,10 +993,7 @@ public void testContainersHelpCommand() throws Exception {
@Test (timeout = 5000)
public void testNodesHelpCommand() throws Exception {
NodeCLI nodeCLI = new NodeCLI();
nodeCLI.setClient(client);
nodeCLI.setSysOutPrintStream(sysOut);
nodeCLI.setSysErrPrintStream(sysErr);
NodeCLI nodeCLI = createAndGetNodeCLI();
nodeCLI.run(new String[] {});
Assert.assertEquals(createNodeCLIHelpMessage(),
sysOutStream.toString());
@ -1290,9 +1287,7 @@ public void testListClusterNodes() throws Exception {
nodeReports.addAll(getNodeReports(1, NodeState.REBOOTED));
nodeReports.addAll(getNodeReports(1, NodeState.LOST));
NodeCLI cli = new NodeCLI();
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
NodeCLI cli = createAndGetNodeCLI();
Set<NodeState> nodeStates = new HashSet<NodeState>();
nodeStates.add(NodeState.NEW);
@ -1545,12 +1540,9 @@ private List<NodeReport> getNodeReports(
@Test
public void testNodeStatus() throws Exception {
NodeId nodeId = NodeId.newInstance("host0", 0);
NodeCLI cli = new NodeCLI();
when(client.getNodeReports())
.thenReturn(getNodeReports(3, NodeState.RUNNING, false, false, false));
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
NodeCLI cli = createAndGetNodeCLI();
int result = cli.run(new String[] { "-status", nodeId.toString() });
assertEquals(0, result);
verify(client).getNodeReports();
@ -1583,12 +1575,9 @@ public void testNodeStatus() throws Exception {
@Test
public void testNodeStatusWithEmptyNodeLabels() throws Exception {
NodeId nodeId = NodeId.newInstance("host0", 0);
NodeCLI cli = new NodeCLI();
when(client.getNodeReports()).thenReturn(
getNodeReports(3, NodeState.RUNNING));
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
NodeCLI cli = createAndGetNodeCLI();
int result = cli.run(new String[] { "-status", nodeId.toString() });
assertEquals(0, result);
verify(client).getNodeReports();
@ -1620,12 +1609,9 @@ public void testNodeStatusWithEmptyNodeLabels() throws Exception {
@Test
public void testNodeStatusWithEmptyResourceUtilization() throws Exception {
NodeId nodeId = NodeId.newInstance("host0", 0);
NodeCLI cli = new NodeCLI();
when(client.getNodeReports())
.thenReturn(getNodeReports(3, NodeState.RUNNING, false, true, true));
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
NodeCLI cli = createAndGetNodeCLI();
int result = cli.run(new String[] { "-status", nodeId.toString() });
assertEquals(0, result);
verify(client).getNodeReports();
@ -1657,12 +1643,10 @@ public void testNodeStatusWithEmptyResourceUtilization() throws Exception {
@Test
public void testAbsentNodeStatus() throws Exception {
NodeId nodeId = NodeId.newInstance("Absenthost0", 0);
NodeCLI cli = new NodeCLI();
when(client.getNodeReports()).thenReturn(
getNodeReports(0, NodeState.RUNNING));
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
NodeCLI cli = createAndGetNodeCLI();
int result = cli.run(new String[] { "-status", nodeId.toString() });
assertEquals(0, result);
verify(client).getNodeReports();
@ -1702,10 +1686,7 @@ public void testMissingArguments() throws Exception {
createContainerCLIHelpMessage()), normalize(sysOutStream.toString()));
sysOutStream.reset();
NodeCLI nodeCLI = new NodeCLI();
nodeCLI.setClient(client);
nodeCLI.setSysOutPrintStream(sysOut);
nodeCLI.setSysErrPrintStream(sysErr);
NodeCLI nodeCLI = createAndGetNodeCLI();
result = nodeCLI.run(new String[] { "-status" });
Assert.assertEquals(result, -1);
Assert.assertEquals(String.format("Missing argument for options%n%1s",
@ -1774,10 +1755,7 @@ public void testGetQueueInfoOverrideIntraQueuePreemption() throws Exception {
yarnClient.init(yarnConf);
yarnClient.start();
QueueCLI cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
QueueCLI cli = createAndGetQueueCLI(yarnClient);
sysOutStream.reset();
// Get status for the root.a queue
int result = cli.run(new String[] { "-status", "a" });
@ -1788,10 +1766,7 @@ public void testGetQueueInfoOverrideIntraQueuePreemption() throws Exception {
// In-queue preemption is disabled at the "root.a" queue level
Assert.assertTrue(queueStatusOut
.contains("Intra-queue Preemption : disabled"));
cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
cli = createAndGetQueueCLI(yarnClient);
sysOutStream.reset();
// Get status for the root.a.a1 queue
result = cli.run(new String[] { "-status", "a1" });
@ -1836,10 +1811,7 @@ public void testGetQueueInfoPreemptionEnabled() throws Exception {
yarnClient.init(yarnConf);
yarnClient.start();
QueueCLI cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
QueueCLI cli = createAndGetQueueCLI(yarnClient);
sysOutStream.reset();
int result = cli.run(new String[] { "-status", "a1" });
assertEquals(0, result);
@ -1880,10 +1852,7 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
yarnClient.init(yarnConf);
yarnClient.start();
QueueCLI cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
QueueCLI cli = createAndGetQueueCLI(yarnClient);
sysOutStream.reset();
int result = cli.run(new String[] { "-status", "a1" });
assertEquals(0, result);
@ -2101,14 +2070,36 @@ private List<NodeReport> getNodeReports(int noOfNodes, NodeState state,
}
private ApplicationCLI createAndGetAppCLI() {
ApplicationCLI cli = new ApplicationCLI();
ApplicationCLI cli = new ApplicationCLI() {
@Override protected void createAndStartYarnClient() {
}
};
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
return cli;
}
private QueueCLI createAndGetQueueCLI() {
QueueCLI cli = new QueueCLI();
return createAndGetQueueCLI(client);
}
private QueueCLI createAndGetQueueCLI(YarnClient client) {
QueueCLI cli = new QueueCLI() {
@Override protected void createAndStartYarnClient() {
}
};
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
return cli;
}
private NodeCLI createAndGetNodeCLI() {
NodeCLI cli = new NodeCLI() {
@Override protected void createAndStartYarnClient() {
}
};
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
@ -2152,6 +2143,9 @@ private String createApplicationCLIHelpMessage() throws IOException {
pw.println(" deprecated, this new command");
pw.println(" 'changeQueue' performs same");
pw.println(" functionality.");
pw.println(" -clusterId <Cluster ID> ClusterId. By default, it will");
pw.println(" take default cluster id from the");
pw.println(" RM");
pw.println(" -component <Component Name> <Count> Works with -flex option to");
pw.println(" change the number of");
pw.println(" components/containers running");
@ -2298,6 +2292,8 @@ private String createApplicationAttemptCLIHelpMessage() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: applicationattempt");
pw.println(" -clusterId <Cluster ID> ClusterId. By default, it will take");
pw.println(" default cluster id from the RM");
pw.println(" -fail <Application Attempt ID> Fails application attempt.");
pw.println(" -help Displays help for all commands.");
pw.println(" -list <Application ID> List application attempts for");
@ -2314,6 +2310,7 @@ private String createContainerCLIHelpMessage() throws IOException {
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: container");
pw.println(" -appTypes <Types> Works with -list to specify the app type when application name is provided.");
pw.println(" -clusterId <Cluster ID> ClusterId. By default, it will take default cluster id from the RM ");
pw.println(" -components <arg> Works with -list to filter instances based on input comma-separated list of component names.");
pw.println(" -help Displays help for all commands.");
pw.println(" -list <Application Name or Attempt ID> List containers for application attempt when application attempt ID is provided. When application name is provided, then it finds the instances of the application based on app's own implementation, and -appTypes option must be specified unless it is the default yarn-service type. With app name, it supports optional use of -version to filter instances based on app version, -components to filter instances based on component names, -states to filter instances based on instance state.");

View File

@ -94,6 +94,8 @@ protected void serviceInit(Configuration conf) throws Exception {
conf, timelineReaderWebAppAddress, RESOURCE_URI_STR_V2);
clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
LOG.info("Initialized TimelineReader URI=" + baseUri + ", clusterId="
+ clusterId);
super.serviceInit(conf);
}